mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-01 19:17:58 +00:00
VAULT-31748: add removable node HA backend interface and raft implementation (#28726)
This commit is contained in:
@@ -87,6 +87,7 @@ var (
|
||||
_ physical.TransactionalLimits = (*RaftBackend)(nil)
|
||||
_ physical.HABackend = (*RaftBackend)(nil)
|
||||
_ physical.MountTableLimitingBackend = (*RaftBackend)(nil)
|
||||
_ physical.RemovableNodeHABackend = (*RaftBackend)(nil)
|
||||
_ physical.Lock = (*RaftLock)(nil)
|
||||
)
|
||||
|
||||
@@ -255,6 +256,30 @@ type RaftBackend struct {
|
||||
// specialPathLimits is a map of special paths to their configured entrySize
|
||||
// limits.
|
||||
specialPathLimits map[string]uint64
|
||||
|
||||
removed atomic.Bool
|
||||
}
|
||||
|
||||
func (b *RaftBackend) IsNodeRemoved(ctx context.Context, nodeID string) (bool, error) {
|
||||
conf, err := b.GetConfiguration(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, srv := range conf.Servers {
|
||||
if srv.NodeID == nodeID {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (b *RaftBackend) IsRemoved() bool {
|
||||
return b.removed.Load()
|
||||
}
|
||||
|
||||
func (b *RaftBackend) RemoveSelf() error {
|
||||
b.removed.Store(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LeaderJoinInfo contains information required by a node to join itself as a
|
||||
@@ -1390,6 +1415,8 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
||||
}
|
||||
}
|
||||
|
||||
b.StartRemovedChecker(ctx)
|
||||
|
||||
b.logger.Trace("finished setting up raft cluster")
|
||||
return nil
|
||||
}
|
||||
@@ -1423,6 +1450,34 @@ func (b *RaftBackend) TeardownCluster(clusterListener cluster.ClusterHook) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *RaftBackend) StartRemovedChecker(ctx context.Context) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
logger := b.logger.Named("removed.checker")
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
removed, err := b.IsNodeRemoved(ctx, b.localID)
|
||||
if err != nil {
|
||||
logger.Error("failed to check if node is removed", "node ID", b.localID, "error", err)
|
||||
continue
|
||||
}
|
||||
if removed {
|
||||
err := b.RemoveSelf()
|
||||
if err != nil {
|
||||
logger.Error("failed to remove self", "node ID", b.localID, "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// CommittedIndex returns the latest index committed to stable storage
|
||||
func (b *RaftBackend) CommittedIndex() uint64 {
|
||||
b.l.RLock()
|
||||
|
||||
@@ -758,6 +758,38 @@ func TestRaft_TransactionalBackend_ThreeNode(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestRaft_Removed creates a 3 node cluster and checks that the nodes are not
|
||||
// removed, then verifies that node3 marks itself as removed when it gets
|
||||
// removed from the cluster
|
||||
func TestRaft_Removed(t *testing.T) {
|
||||
t.Parallel()
|
||||
testBothRaftBackends(t, func(t *testing.T, raftWALValue string) {
|
||||
conf := map[string]string{
|
||||
"trailing_logs": "100",
|
||||
"raft_wal": raftWALValue,
|
||||
}
|
||||
|
||||
raft1, _ := GetRaftWithConfig(t, true, true, conf)
|
||||
raft2, _ := GetRaftWithConfig(t, false, true, conf)
|
||||
raft3, _ := GetRaftWithConfig(t, false, true, conf)
|
||||
|
||||
addPeer(t, raft1, raft2)
|
||||
addPeer(t, raft1, raft3)
|
||||
physical.ExerciseBackend(t, raft1)
|
||||
|
||||
require.False(t, raft1.IsRemoved())
|
||||
require.False(t, raft2.IsRemoved())
|
||||
require.False(t, raft3.IsRemoved())
|
||||
|
||||
err := raft1.RemovePeer(context.Background(), raft3.NodeID())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Eventually(t, raft3.IsRemoved, 5*time.Second, 500*time.Millisecond)
|
||||
require.False(t, raft1.IsRemoved())
|
||||
require.False(t, raft2.IsRemoved())
|
||||
})
|
||||
}
|
||||
|
||||
// TestRaft_TransactionalLimitsEnvOverride ensures the ENV var overrides for
|
||||
// transaction size limits are plumbed through as expected.
|
||||
func TestRaft_TransactionalLimitsEnvOverride(t *testing.T) {
|
||||
|
||||
@@ -60,6 +60,25 @@ type HABackend interface {
|
||||
HAEnabled() bool
|
||||
}
|
||||
|
||||
// RemovableNodeHABackend is used for HA backends that can remove nodes from
|
||||
// their cluster
|
||||
type RemovableNodeHABackend interface {
|
||||
HABackend
|
||||
|
||||
// IsNodeRemoved checks if the node with the given ID has been removed.
|
||||
// This will only be called on the active node.
|
||||
IsNodeRemoved(ctx context.Context, nodeID string) (bool, error)
|
||||
|
||||
// NodeID returns the ID for this node
|
||||
NodeID() string
|
||||
|
||||
// IsRemoved checks if this node has been removed
|
||||
IsRemoved() bool
|
||||
|
||||
// RemoveSelf marks this node as being removed
|
||||
RemoveSelf() error
|
||||
}
|
||||
|
||||
// FencingHABackend is an HABackend which provides the additional guarantee that
|
||||
// each Lock it returns from LockWith is also a FencingLock. A FencingLock
|
||||
// provides a mechanism to retrieve a fencing token that can be included by
|
||||
|
||||
Reference in New Issue
Block a user