Use our heartbeat echo RPCs to estimate clock skew, expose it in status APIs (#24343)

This commit is contained in:
Nick Cabatoff
2023-12-04 12:04:38 -05:00
committed by GitHub
parent 02eadb8ecb
commit b8f531142b
15 changed files with 497 additions and 188 deletions

View File

@@ -209,6 +209,68 @@ func WaitForActiveNode(ctx context.Context, cluster VaultCluster) (int, error) {
return -1, ctx.Err()
}
func WaitForStandbyNode(ctx context.Context, cluster VaultCluster, nodeIdx int) error {
if nodeIdx >= len(cluster.Nodes()) {
return fmt.Errorf("invalid nodeIdx %d for cluster", nodeIdx)
}
node := cluster.Nodes()[nodeIdx]
client := node.APIClient()
var err error
for ctx.Err() == nil {
var resp *api.LeaderResponse
resp, err = client.Sys().LeaderWithContext(ctx)
switch {
case err != nil:
case resp.IsSelf:
return fmt.Errorf("waiting for standby but node is leader")
case resp.LeaderAddress == "":
err = fmt.Errorf("node doesn't know leader address")
default:
return nil
}
time.Sleep(100 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return err
}
func WaitForActiveNodeAndStandbys(ctx context.Context, cluster VaultCluster) (int, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
leaderIdx, err := WaitForActiveNode(ctx, cluster)
if err != nil {
return 0, err
}
if len(cluster.Nodes()) == 1 {
return 0, nil
}
errs := make(chan error)
for i := range cluster.Nodes() {
if i == leaderIdx {
continue
}
go func(i int) {
errs <- WaitForStandbyNode(ctx, cluster, i)
}(i)
}
var merr *multierror.Error
expectedStandbys := len(cluster.Nodes()) - 1
for i := 0; i < expectedStandbys; i++ {
merr = multierror.Append(merr, <-errs)
}
return leaderIdx, merr.ErrorOrNil()
}
func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster) error {
logger := cluster.NamedLogger("WaitForActiveNodeAndPerfStandbys")
// This WaitForActiveNode was added because after a Raft cluster is sealed