mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 17:52:32 +00:00
VAULT-28477 Bootstrap and persist autopilot versions (#28186)
* add versions to raft bootstrap answer * remove version faking * save version state to storage * logging and copy * changelog * use leader versions on upgrade * add enterprise cluster test * never drop persisted states * rename to partialCopy, fix comment, fix log line
This commit is contained in:
3
changelog/28186.txt
Normal file
3
changelog/28186.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:improvement
|
||||
raft/autopilot: Persist Raft server versions so autopilot always knows the versions of all servers in the cluster. Include server versions in the Raft bootstrap challenge answer so autopilot immediately knows the versions of new nodes.
|
||||
```
|
||||
@@ -690,6 +690,12 @@ func (b *RaftBackend) UpgradeVersion() string {
|
||||
return b.effectiveSDKVersion
|
||||
}
|
||||
|
||||
func (b *RaftBackend) SDKVersion() string {
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
return b.effectiveSDKVersion
|
||||
}
|
||||
|
||||
func (b *RaftBackend) verificationInterval() time.Duration {
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
|
||||
@@ -211,6 +211,86 @@ type FollowerState struct {
|
||||
RedundancyZone string
|
||||
}
|
||||
|
||||
// partialCopy returns a partial copy of the follower state.
|
||||
// This copy uses the same pointer to the IsDead
|
||||
// atomic field. We need to do this to ensure that
|
||||
// an update of the IsDead boolean will still be
|
||||
// accessible in a copied state.
|
||||
func (f *FollowerState) partialCopy() *FollowerState {
|
||||
return &FollowerState{
|
||||
AppliedIndex: f.AppliedIndex,
|
||||
LastHeartbeat: f.LastHeartbeat,
|
||||
LastTerm: f.LastTerm,
|
||||
IsDead: f.IsDead,
|
||||
DesiredSuffrage: f.DesiredSuffrage,
|
||||
Version: f.Version,
|
||||
UpgradeVersion: f.UpgradeVersion,
|
||||
RedundancyZone: f.RedundancyZone,
|
||||
}
|
||||
}
|
||||
|
||||
// PersistedFollowerState holds the information that gets persisted to storage
|
||||
type PersistedFollowerState struct {
|
||||
Version string `json:"version"`
|
||||
UpgradeVersion string `json:"upgrade_version"`
|
||||
}
|
||||
|
||||
type PersistedFollowerStates struct {
|
||||
l sync.RWMutex
|
||||
States map[string]PersistedFollowerState
|
||||
}
|
||||
|
||||
// shouldUpdate checks if the persisted state contains the same servers as the
|
||||
// current autopilot state. If grabLock is true, a read lock is acquired before
|
||||
// accessing the map
|
||||
func (p *PersistedFollowerStates) shouldUpdate(state *autopilot.State, grabLock bool) bool {
|
||||
if grabLock {
|
||||
p.l.RLock()
|
||||
defer p.l.RUnlock()
|
||||
}
|
||||
if len(state.Servers) != len(p.States) {
|
||||
return true
|
||||
}
|
||||
for id, server := range state.Servers {
|
||||
persistedServer, found := p.States[string(id)]
|
||||
if !found {
|
||||
return true
|
||||
}
|
||||
if server.Server.Version != persistedServer.Version ||
|
||||
server.Server.Meta[AutopilotUpgradeVersionTag] != persistedServer.UpgradeVersion {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// updatePersistedState checks if the persisted state matches the current
|
||||
// autopilot state. If not, the state is replaced and persisted
|
||||
func (d *Delegate) updatePersistedState(state *autopilot.State) error {
|
||||
if !d.persistedState.shouldUpdate(state, true) {
|
||||
return nil
|
||||
}
|
||||
newStates := make(map[string]PersistedFollowerState)
|
||||
for id, server := range state.Servers {
|
||||
newStates[string(id)] = PersistedFollowerState{
|
||||
Version: server.Server.Version,
|
||||
UpgradeVersion: server.Server.Meta[AutopilotUpgradeVersionTag],
|
||||
}
|
||||
}
|
||||
d.persistedState.l.Lock()
|
||||
defer d.persistedState.l.Unlock()
|
||||
if !d.persistedState.shouldUpdate(state, false) {
|
||||
return nil
|
||||
}
|
||||
d.logger.Debug("updating autopilot persisted state")
|
||||
err := d.saveStateFn(newStates)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.persistedState.States = newStates
|
||||
return nil
|
||||
}
|
||||
|
||||
// EchoRequestUpdate is here to avoid 1) the list of arguments to Update() getting huge 2) an import cycle on the vault package
|
||||
type EchoRequestUpdate struct {
|
||||
NodeID string
|
||||
@@ -315,13 +395,17 @@ type Delegate struct {
|
||||
dl sync.RWMutex
|
||||
inflightRemovals map[raft.ServerID]bool
|
||||
emptyVersionLogs map[raft.ServerID]struct{}
|
||||
persistedState *PersistedFollowerStates
|
||||
saveStateFn func(p map[string]PersistedFollowerState) error
|
||||
}
|
||||
|
||||
func NewDelegate(b *RaftBackend) *Delegate {
|
||||
func NewDelegate(b *RaftBackend, persistedStates map[string]PersistedFollowerState, savePersistedStates func(p map[string]PersistedFollowerState) error) *Delegate {
|
||||
return &Delegate{
|
||||
RaftBackend: b,
|
||||
inflightRemovals: make(map[raft.ServerID]bool),
|
||||
emptyVersionLogs: make(map[raft.ServerID]struct{}),
|
||||
persistedState: &PersistedFollowerStates{States: persistedStates},
|
||||
saveStateFn: savePersistedStates,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -365,6 +449,13 @@ func (d *Delegate) NotifyState(state *autopilot.State) {
|
||||
metrics.SetGaugeWithLabels([]string{"autopilot", "node", "healthy"}, 0, labels)
|
||||
}
|
||||
}
|
||||
|
||||
// if there is a change in versions or membership, we should update
|
||||
// our persisted state
|
||||
err := d.updatePersistedState(state)
|
||||
if err != nil {
|
||||
d.logger.Error("failed to persist autopilot state", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -420,6 +511,9 @@ func (d *Delegate) KnownServers() map[raft.ServerID]*autopilot.Server {
|
||||
d.followerStates.l.RLock()
|
||||
defer d.followerStates.l.RUnlock()
|
||||
|
||||
d.persistedState.l.RLock()
|
||||
defer d.persistedState.l.RUnlock()
|
||||
|
||||
ret := make(map[raft.ServerID]*autopilot.Server)
|
||||
for id, state := range d.followerStates.followers {
|
||||
// If the server is not in raft configuration, even if we received a follower
|
||||
@@ -428,26 +522,14 @@ func (d *Delegate) KnownServers() map[raft.ServerID]*autopilot.Server {
|
||||
continue
|
||||
}
|
||||
|
||||
// If version isn't found in the state, fake it using the version from the leader so that autopilot
|
||||
// doesn't demote the node to a non-voter, just because of a missed heartbeat. Note that this should
|
||||
// be the SDK version, not the upgrade version.
|
||||
currentServerID := raft.ServerID(id)
|
||||
followerVersion := state.Version
|
||||
leaderVersion := d.effectiveSDKVersion
|
||||
d.dl.Lock()
|
||||
if followerVersion == "" {
|
||||
if _, ok := d.emptyVersionLogs[currentServerID]; !ok {
|
||||
d.logger.Trace("received empty Vault version in heartbeat state. faking it with the leader version for now", "id", id, "leader version", leaderVersion)
|
||||
d.emptyVersionLogs[currentServerID] = struct{}{}
|
||||
}
|
||||
followerVersion = leaderVersion
|
||||
} else {
|
||||
if _, ok := d.emptyVersionLogs[currentServerID]; ok {
|
||||
d.logger.Trace("received non-empty version in heartbeat state. no longer need to fake it", "id", id, "update_version", followerVersion)
|
||||
delete(d.emptyVersionLogs, currentServerID)
|
||||
}
|
||||
followerVersion, upgradeVersion := d.determineFollowerVersions(id, state)
|
||||
if state.UpgradeVersion != upgradeVersion {
|
||||
// we only have a read lock on state, so we can't modify it
|
||||
// safely. Instead, copy it to override the upgrade version
|
||||
state = state.partialCopy()
|
||||
state.UpgradeVersion = upgradeVersion
|
||||
}
|
||||
d.dl.Unlock()
|
||||
|
||||
server := &autopilot.Server{
|
||||
ID: currentServerID,
|
||||
@@ -501,6 +583,54 @@ func (d *Delegate) KnownServers() map[raft.ServerID]*autopilot.Server {
|
||||
return ret
|
||||
}
|
||||
|
||||
// determineFollowerVersions uses the following logic:
|
||||
// - if the version and upgrade version are present in the follower state,
|
||||
// return those.
|
||||
// - if the persisted states map is empty, it means that persisted states
|
||||
// don't exist. This happens on an upgrade to 1.18. Use the leader node's
|
||||
// versions.
|
||||
// - use the versions in the persisted states map
|
||||
//
|
||||
// This function must be called with a lock on d.followerStates
|
||||
// and d.persistedStates.
|
||||
func (d *Delegate) determineFollowerVersions(id string, state *FollowerState) (version string, upgradeVersion string) {
|
||||
// if we have both versions in follower states, use those
|
||||
if state.Version != "" && state.UpgradeVersion != "" {
|
||||
return state.Version, state.UpgradeVersion
|
||||
}
|
||||
|
||||
version = state.Version
|
||||
upgradeVersion = state.UpgradeVersion
|
||||
|
||||
// the persistedState map should only be empty on upgrades
|
||||
// to 1.18.x. This is the only case where we'll stub with
|
||||
// the leader's versions
|
||||
if len(d.persistedState.States) == 0 {
|
||||
if version == "" {
|
||||
version = d.effectiveSDKVersion
|
||||
d.logger.Debug("no persisted state, using leader version", "id", id, "version", version)
|
||||
}
|
||||
if upgradeVersion == "" {
|
||||
upgradeVersion = d.upgradeVersion
|
||||
d.logger.Debug("no persisted state, using leader upgrade version version", "id", id, "upgrade_version", upgradeVersion)
|
||||
}
|
||||
return version, upgradeVersion
|
||||
}
|
||||
|
||||
// Use the persistedStates map to fill in the sdk
|
||||
// and upgrade versions
|
||||
pState := d.persistedState.States[id]
|
||||
if version == "" {
|
||||
version = pState.Version
|
||||
d.logger.Debug("using follower version from persisted states", "id", id, "version", version)
|
||||
}
|
||||
if upgradeVersion == "" {
|
||||
upgradeVersion = pState.UpgradeVersion
|
||||
d.logger.Debug("using upgrade version from persisted states", "id", id, "upgrade_version", upgradeVersion)
|
||||
}
|
||||
return version, upgradeVersion
|
||||
}
|
||||
|
||||
// RemoveFailedServer is called by the autopilot library when it desires a node
|
||||
// to be removed from the raft configuration. This function removes the node
|
||||
// from the raft cluster and stops tracking its information in follower states.
|
||||
@@ -834,11 +964,19 @@ func (b *RaftBackend) DisableAutopilot() {
|
||||
b.l.Unlock()
|
||||
}
|
||||
|
||||
type AutopilotSetupOptions struct {
|
||||
StorageConfig *AutopilotConfig
|
||||
FollowerStates *FollowerStates
|
||||
Disable bool
|
||||
PersistedStates map[string]PersistedFollowerState
|
||||
SavePersistedStates func(p map[string]PersistedFollowerState) error
|
||||
}
|
||||
|
||||
// SetupAutopilot gathers information required to configure autopilot and starts
|
||||
// it. If autopilot is disabled, this function does nothing.
|
||||
func (b *RaftBackend) SetupAutopilot(ctx context.Context, storageConfig *AutopilotConfig, followerStates *FollowerStates, disable bool) {
|
||||
func (b *RaftBackend) SetupAutopilot(ctx context.Context, opts *AutopilotSetupOptions) {
|
||||
b.l.Lock()
|
||||
if disable || os.Getenv("VAULT_RAFT_AUTOPILOT_DISABLE") != "" {
|
||||
if opts.Disable || os.Getenv("VAULT_RAFT_AUTOPILOT_DISABLE") != "" {
|
||||
b.disableAutopilot = true
|
||||
}
|
||||
|
||||
@@ -852,7 +990,7 @@ func (b *RaftBackend) SetupAutopilot(ctx context.Context, storageConfig *Autopil
|
||||
b.autopilotConfig = b.defaultAutopilotConfig()
|
||||
|
||||
// Merge the setting provided over the API
|
||||
b.autopilotConfig.Merge(storageConfig)
|
||||
b.autopilotConfig.Merge(opts.StorageConfig)
|
||||
|
||||
infoArgs := []interface{}{"config", b.autopilotConfig}
|
||||
|
||||
@@ -869,8 +1007,9 @@ func (b *RaftBackend) SetupAutopilot(ctx context.Context, storageConfig *Autopil
|
||||
options = append(options, autopilot.WithUpdateInterval(b.autopilotUpdateInterval))
|
||||
infoArgs = append(infoArgs, []interface{}{"update_interval", b.autopilotUpdateInterval}...)
|
||||
}
|
||||
b.autopilot = autopilot.New(b.raft, NewDelegate(b), options...)
|
||||
b.followerStates = followerStates
|
||||
delegate := NewDelegate(b, opts.PersistedStates, opts.SavePersistedStates)
|
||||
b.autopilot = autopilot.New(b.raft, delegate, options...)
|
||||
b.followerStates = opts.FollowerStates
|
||||
b.followerHeartbeatTicker = time.NewTicker(1 * time.Second)
|
||||
b.l.Unlock()
|
||||
|
||||
|
||||
@@ -64,6 +64,7 @@ func raftClusterBuilder(t testing.TB, ropts *RaftClusterOpts) (*vault.CoreConfig
|
||||
DisableAutopilot: !ropts.EnableAutopilot,
|
||||
EnableResponseHeaderRaftNodeID: ropts.EnableResponseHeaderRaftNodeID,
|
||||
Seal: ropts.Seal,
|
||||
EnableRaw: true,
|
||||
}
|
||||
|
||||
opts := vault.TestClusterOptions{
|
||||
|
||||
@@ -50,6 +50,12 @@ func (b *SystemBackend) raftStoragePaths() []*framework.Path {
|
||||
"non_voter": {
|
||||
Type: framework.TypeBool,
|
||||
},
|
||||
"upgrade_version": {
|
||||
Type: framework.TypeString,
|
||||
},
|
||||
"sdk_version": {
|
||||
Type: framework.TypeString,
|
||||
},
|
||||
},
|
||||
|
||||
Operations: map[logical.Operation]framework.OperationHandler{
|
||||
@@ -370,6 +376,8 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
|
||||
added := b.Core.raftFollowerStates.Update(&raft.EchoRequestUpdate{
|
||||
NodeID: serverID,
|
||||
DesiredSuffrage: desiredSuffrage,
|
||||
SDKVersion: d.Get("sdk_version").(string),
|
||||
UpgradeVersion: d.Get("upgrade_version").(string),
|
||||
})
|
||||
|
||||
switch nonVoter {
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"github.com/hashicorp/go-uuid"
|
||||
goversion "github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/vault/api"
|
||||
httpPriority "github.com/hashicorp/vault/http/priority"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
@@ -49,7 +50,8 @@ var (
|
||||
raftTLSStoragePath = "core/raft/tls"
|
||||
raftTLSRotationPeriod = 24 * time.Hour
|
||||
|
||||
raftAutopilotConfigurationStoragePath = "core/raft/autopilot/configuration"
|
||||
raftAutopilotConfigurationStoragePath = "core/raft/autopilot/configuration"
|
||||
raftAutopilotPersistedStateStoragePath = "core/raft/autopilot/state"
|
||||
|
||||
ErrJoinWithoutAutoloading = errors.New("attempt to join a cluster using autoloaded licenses while not using autoloading ourself")
|
||||
)
|
||||
@@ -339,10 +341,42 @@ func (c *Core) setupRaftActiveNode(ctx context.Context) error {
|
||||
c.logger.Error("failed to load autopilot config from storage when setting up cluster; continuing since autopilot falls back to default config", "error", err)
|
||||
}
|
||||
disableAutopilot := c.disableAutopilot
|
||||
raftBackend.SetupAutopilot(c.activeContext, autopilotConfig, c.raftFollowerStates, disableAutopilot)
|
||||
persistedState, err := c.autopilotPersistedState()
|
||||
if err != nil {
|
||||
c.logger.Error("failed to load autopilot persisted state from storage", "error", err)
|
||||
}
|
||||
raftBackend.SetupAutopilot(c.activeContext, &raft.AutopilotSetupOptions{
|
||||
StorageConfig: autopilotConfig,
|
||||
FollowerStates: c.raftFollowerStates,
|
||||
Disable: disableAutopilot,
|
||||
PersistedStates: persistedState,
|
||||
SavePersistedStates: c.saveAutopilotPersistedState,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) autopilotPersistedState() (map[string]raft.PersistedFollowerState, error) {
|
||||
entry, err := c.barrier.Get(c.activeContext, raftAutopilotPersistedStateStoragePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var m map[string]raft.PersistedFollowerState
|
||||
if entry == nil {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
err = entry.DecodeJSON(&m)
|
||||
return m, err
|
||||
}
|
||||
|
||||
func (c *Core) saveAutopilotPersistedState(states map[string]raft.PersistedFollowerState) error {
|
||||
entry, err := logical.StorageEntryJSON(raftAutopilotPersistedStateStoragePath, states)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.barrier.Put(httpPriority.ContextWithRequestPriority(c.activeContext, httpPriority.NeverDrop), entry)
|
||||
}
|
||||
|
||||
func (c *Core) stopRaftActiveNode() {
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
@@ -1254,7 +1288,11 @@ func (c *Core) raftLeaderInfo(leaderInfo *raft.LeaderJoinInfo, disco *discover.D
|
||||
|
||||
// NewDelegateForCore creates a raft.Delegate for the specified core using its backend.
|
||||
func NewDelegateForCore(c *Core) *raft.Delegate {
|
||||
return raft.NewDelegate(c.getRaftBackend())
|
||||
persistedState, err := c.autopilotPersistedState()
|
||||
if err != nil {
|
||||
c.logger.Error("failed to load autopilot persisted state from storage", "error", err)
|
||||
}
|
||||
return raft.NewDelegate(c.getRaftBackend(), persistedState, c.saveAutopilotPersistedState)
|
||||
}
|
||||
|
||||
// getRaftBackend returns the RaftBackend from the HA or physical backend,
|
||||
@@ -1321,10 +1359,12 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess seal.Access, r
|
||||
|
||||
answerReq := raftInfo.leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer")
|
||||
if err := answerReq.SetJSONBody(map[string]interface{}{
|
||||
"answer": base64.StdEncoding.EncodeToString(plaintext),
|
||||
"cluster_addr": clusterAddr,
|
||||
"server_id": raftBackend.NodeID(),
|
||||
"non_voter": raftInfo.nonVoter,
|
||||
"answer": base64.StdEncoding.EncodeToString(plaintext),
|
||||
"cluster_addr": clusterAddr,
|
||||
"server_id": raftBackend.NodeID(),
|
||||
"non_voter": raftInfo.nonVoter,
|
||||
"sdk_version": raftBackend.SDKVersion(),
|
||||
"upgrade_version": raftBackend.UpgradeVersion(),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user