mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 09:42:25 +00:00
VAULT-32568: Shutdown node when it's not in the raft config (#29052)
* add implementation and tests * add eventually condition for test flake
This commit is contained in:
@@ -256,7 +256,8 @@ type RaftBackend struct {
|
||||
// limits.
|
||||
specialPathLimits map[string]uint64
|
||||
|
||||
removed atomic.Bool
|
||||
removed atomic.Bool
|
||||
removedCallback func()
|
||||
}
|
||||
|
||||
func (b *RaftBackend) IsNodeRemoved(ctx context.Context, nodeID string) (bool, error) {
|
||||
@@ -1030,6 +1031,12 @@ func (b *RaftBackend) SetRestoreCallback(restoreCb restoreCallback) {
|
||||
b.fsm.l.Unlock()
|
||||
}
|
||||
|
||||
func (b *RaftBackend) SetRemovedCallback(cb func()) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
b.removedCallback = cb
|
||||
}
|
||||
|
||||
func (b *RaftBackend) applyConfigSettings(config *raft.Config) error {
|
||||
config.Logger = b.logger
|
||||
multiplierRaw, ok := b.conf["performance_multiplier"]
|
||||
@@ -1107,9 +1114,12 @@ type SetupOpts struct {
|
||||
// We pass it in though because it can be overridden in tests or via ENV in
|
||||
// core.
|
||||
EffectiveSDKVersion string
|
||||
|
||||
// RemovedCallback is the function to call when the node has been removed
|
||||
RemovedCallback func()
|
||||
}
|
||||
|
||||
func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer) error {
|
||||
func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer, removedCallback func()) error {
|
||||
recoveryModeConfig := &raft.Configuration{
|
||||
Servers: []raft.Server{
|
||||
{
|
||||
@@ -1122,6 +1132,7 @@ func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer) error
|
||||
return b.SetupCluster(context.Background(), SetupOpts{
|
||||
StartAsLeader: true,
|
||||
RecoveryModeConfig: recoveryModeConfig,
|
||||
RemovedCallback: removedCallback,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1391,6 +1402,9 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
||||
}
|
||||
}
|
||||
|
||||
if opts.RemovedCallback != nil {
|
||||
b.removedCallback = opts.RemovedCallback
|
||||
}
|
||||
b.StartRemovedChecker(ctx)
|
||||
|
||||
b.logger.Trace("finished setting up raft cluster")
|
||||
@@ -1430,6 +1444,7 @@ func (b *RaftBackend) StartRemovedChecker(ctx context.Context) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
hasBeenPresent := false
|
||||
|
||||
logger := b.logger.Named("removed.checker")
|
||||
for {
|
||||
@@ -1440,11 +1455,21 @@ func (b *RaftBackend) StartRemovedChecker(ctx context.Context) {
|
||||
logger.Error("failed to check if node is removed", "node ID", b.localID, "error", err)
|
||||
continue
|
||||
}
|
||||
if removed {
|
||||
if !removed {
|
||||
hasBeenPresent = true
|
||||
}
|
||||
// the node must have been previously present in the config,
|
||||
// only then should we consider it removed and shutdown
|
||||
if removed && hasBeenPresent {
|
||||
err := b.RemoveSelf()
|
||||
if err != nil {
|
||||
logger.Error("failed to remove self", "node ID", b.localID, "error", err)
|
||||
}
|
||||
b.l.RLock()
|
||||
if b.removedCallback != nil {
|
||||
b.removedCallback()
|
||||
}
|
||||
b.l.RUnlock()
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -785,10 +786,15 @@ func TestRaft_Removed(t *testing.T) {
|
||||
require.False(t, raft2.IsRemoved())
|
||||
require.False(t, raft3.IsRemoved())
|
||||
|
||||
callbackCalled := atomic.Bool{}
|
||||
raft3.SetRemovedCallback(func() {
|
||||
callbackCalled.Store(true)
|
||||
})
|
||||
err := raft1.RemovePeer(context.Background(), raft3.NodeID())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Eventually(t, raft3.IsRemoved, 15*time.Second, 500*time.Millisecond)
|
||||
require.True(t, callbackCalled.Load())
|
||||
require.False(t, raft1.IsRemoved())
|
||||
require.False(t, raft2.IsRemoved())
|
||||
})
|
||||
|
||||
@@ -4626,3 +4626,9 @@ func (c *Core) IsRemovedFromCluster() (removed, ok bool) {
|
||||
|
||||
return removableNodeHA.IsRemoved(), true
|
||||
}
|
||||
|
||||
func (c *Core) shutdownRemovedNode() {
|
||||
go func() {
|
||||
c.ShutdownCoreError(errors.New("node has been removed from cluster"))
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -1388,5 +1388,29 @@ func TestRaftCluster_Removed(t *testing.T) {
|
||||
"test": "other_data",
|
||||
})
|
||||
require.Error(t, err)
|
||||
require.True(t, follower.Sealed())
|
||||
require.Eventually(t, follower.Sealed, 3*time.Second, 250*time.Millisecond)
|
||||
}
|
||||
|
||||
// TestRaftCluster_Removed_RaftConfig creates a 3 node raft cluster with an extremely long
|
||||
// heartbeat interval, and then removes one of the nodes. The test verifies that
|
||||
// removed node discovers that it has been removed (via not being present in the
|
||||
// raft config) and seals.
|
||||
func TestRaftCluster_Removed_RaftConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
conf, opts := raftClusterBuilder(t, nil)
|
||||
conf.ClusterHeartbeatInterval = 5 * time.Minute
|
||||
cluster := vault.NewTestCluster(t, conf, &opts)
|
||||
vault.TestWaitActive(t, cluster.Cores[0].Core)
|
||||
|
||||
follower := cluster.Cores[2]
|
||||
followerClient := follower.Client
|
||||
_, err := followerClient.Logical().Write("secret/foo", map[string]interface{}{
|
||||
"test": "data",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = cluster.Cores[0].Client.Logical().Write("/sys/storage/raft/remove-peer", map[string]interface{}{
|
||||
"server_id": follower.NodeID,
|
||||
})
|
||||
require.Eventually(t, follower.Sealed, 10*time.Second, 500*time.Millisecond)
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ func (c *Core) InitializeRecovery(ctx context.Context) error {
|
||||
return raftStorage.StartRecoveryCluster(context.Background(), raft.Peer{
|
||||
ID: raftStorage.NodeID(),
|
||||
Address: parsedClusterAddr.Host,
|
||||
})
|
||||
}, c.shutdownRemovedNode)
|
||||
})
|
||||
|
||||
return nil
|
||||
|
||||
@@ -166,6 +166,7 @@ func (c *Core) startRaftBackend(ctx context.Context) (retErr error) {
|
||||
ClusterListener: c.getClusterListener(),
|
||||
StartAsLeader: creating,
|
||||
EffectiveSDKVersion: c.effectiveSDKVersion,
|
||||
RemovedCallback: c.shutdownRemovedNode,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1417,6 +1418,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess seal.Access, r
|
||||
opts := raft.SetupOpts{
|
||||
TLSKeyring: answerResp.Data.TLSKeyring,
|
||||
ClusterListener: c.getClusterListener(),
|
||||
RemovedCallback: c.shutdownRemovedNode,
|
||||
}
|
||||
err = raftBackend.SetupCluster(ctx, opts)
|
||||
if err != nil {
|
||||
@@ -1472,7 +1474,8 @@ func (c *Core) RaftBootstrap(ctx context.Context, onInit bool) error {
|
||||
}
|
||||
|
||||
raftOpts := raft.SetupOpts{
|
||||
StartAsLeader: true,
|
||||
StartAsLeader: true,
|
||||
RemovedCallback: c.shutdownRemovedNode,
|
||||
}
|
||||
|
||||
if !onInit {
|
||||
|
||||
@@ -105,7 +105,7 @@ func haMembershipClientCheck(err error, c *Core, haBackend physical.RemovableNod
|
||||
if removeErr != nil {
|
||||
c.logger.Debug("failed to remove self", "error", removeErr)
|
||||
}
|
||||
go c.ShutdownCoreError(errors.New("node removed from HA configuration"))
|
||||
c.shutdownRemovedNode()
|
||||
}
|
||||
|
||||
func haMembershipUnaryClientInterceptor(c *Core, haBackend physical.RemovableNodeHABackend) grpc.UnaryClientInterceptor {
|
||||
|
||||
@@ -1451,6 +1451,7 @@ func NewTestCluster(t testing.TB, base *CoreConfig, opts *TestClusterOptions) *T
|
||||
}
|
||||
|
||||
if base != nil {
|
||||
coreConfig.ClusterHeartbeatInterval = base.ClusterHeartbeatInterval
|
||||
coreConfig.DetectDeadlocks = TestDeadlockDetection
|
||||
coreConfig.RawConfig = base.RawConfig
|
||||
coreConfig.DisableCache = base.DisableCache
|
||||
|
||||
Reference in New Issue
Block a user