From ce74f4f1de65fecbb409849cf0533635a7c9f400 Mon Sep 17 00:00:00 2001 From: Nick Cabatoff Date: Fri, 7 Oct 2022 12:09:08 -0400 Subject: [PATCH] Add more raft metrics, emit more metrics on non-perf standbys (#12166) Add some metrics helpful for monitoring raft cluster state. Furthermore, we weren't emitting bolt metrics on regular (non-perf) standbys, and there were other metrics in metricsLoop that would make sense to include in OSS but weren't. We now have an active-node-only func, emitMetricsActiveNode. This runs metricsLoop on the active node. Standbys and perf-standbys run metricsLoop from a goroutine managed by the runStandby rungroup. --- changelog/12166.txt | 3 +++ physical/raft/fsm.go | 2 ++ physical/raft/raft.go | 13 +++++++++++ physical/raft/raft_autopilot.go | 22 +++++++++++++++---- vault/core.go | 6 ++--- vault/core_metrics.go | 23 +++++++++----------- vault/ha.go | 11 ++++++++++ vault/request_forwarding_rpc.go | 6 +++++ website/content/docs/internals/telemetry.mdx | 15 +++++++++---- 9 files changed, 77 insertions(+), 24 deletions(-) create mode 100644 changelog/12166.txt diff --git a/changelog/12166.txt b/changelog/12166.txt new file mode 100644 index 0000000000..9cec76cbaf --- /dev/null +++ b/changelog/12166.txt @@ -0,0 +1,3 @@ +```release-note:improvement +storage/raft: add additional raft metrics relating to applied index and heartbeating; also ensure OSS standbys emit periodic metrics. +``` diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 8d5b5524db..f8ca9c6546 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -136,6 +136,8 @@ func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) { }) dbPath := filepath.Join(path, databaseFilename) + f.l.Lock() + defer f.l.Unlock() if err := f.openDBFile(dbPath); err != nil { return nil, fmt.Errorf("failed to open bolt file: %w", err) } diff --git a/physical/raft/raft.go b/physical/raft/raft.go index 98d51c05d9..7400794f14 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -581,9 +581,22 @@ func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) { b.l.RLock() logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats() fsmStats := b.fsm.db.Stats() + stats := b.raft.Stats() b.l.RUnlock() b.collectMetricsWithStats(logstoreStats, sink, "logstore") b.collectMetricsWithStats(fsmStats, sink, "fsm") + labels := []metrics.Label{ + { + Name: "peer_id", + Value: b.localID, + }, + } + for _, key := range []string{"term", "commit_index", "applied_index", "fsm_pending"} { + n, err := strconv.ParseUint(stats[key], 10, 64) + if err == nil { + sink.SetGaugeWithLabels([]string{"raft_storage", "stats", key}, float32(n), labels) + } + } } func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) { diff --git a/physical/raft/raft_autopilot.go b/physical/raft/raft_autopilot.go index eaa75dfa19..5596bbf425 100644 --- a/physical/raft/raft_autopilot.go +++ b/physical/raft/raft_autopilot.go @@ -540,11 +540,25 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() { tickerCh := b.followerHeartbeatTicker.C b.l.RUnlock() + followerGauge := func(peerID string, suffix string, value float32) { + labels := []metrics.Label{ + { + Name: "peer_id", + Value: peerID, + }, + } + metrics.SetGaugeWithLabels([]string{"raft_storage", "follower", suffix}, value, labels) + } for range tickerCh { b.l.RLock() - if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 { - b.followerStates.l.RLock() - for _, state := range b.followerStates.followers { + b.followerStates.l.RLock() + myAppliedIndex := b.raft.AppliedIndex() + for peerID, state := range b.followerStates.followers { + timeSinceLastHeartbeat := time.Now().Sub(state.LastHeartbeat) / time.Millisecond + followerGauge(peerID, "last_heartbeat_ms", float32(timeSinceLastHeartbeat)) + followerGauge(peerID, "applied_index_delta", float32(myAppliedIndex-state.AppliedIndex)) + + if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 { if state.LastHeartbeat.IsZero() || state.IsDead.Load() { continue } @@ -553,8 +567,8 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() { state.IsDead.Store(true) } } - b.followerStates.l.RUnlock() } + b.followerStates.l.RUnlock() b.l.RUnlock() } } diff --git a/vault/core.go b/vault/core.go index 7a4fd8a8f1..cbe5264070 100644 --- a/vault/core.go +++ b/vault/core.go @@ -2252,6 +2252,9 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c return err } + c.metricsCh = make(chan struct{}) + go c.emitMetricsActiveNode(c.metricsCh) + return nil } @@ -2310,9 +2313,6 @@ func (c *Core) postUnseal(ctx context.Context, ctxCancelFunc context.CancelFunc, seal.StartHealthCheck() } - c.metricsCh = make(chan struct{}) - go c.emitMetrics(c.metricsCh) - // This is intentionally the last block in this function. We want to allow // writes just before allowing client requests, to ensure everything has // been set up properly before any writes can have happened. diff --git a/vault/core_metrics.go b/vault/core_metrics.go index cd570eff3b..c6e719fc12 100644 --- a/vault/core_metrics.go +++ b/vault/core_metrics.go @@ -113,16 +113,16 @@ func (c *Core) metricsLoop(stopCh chan struct{}) { c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil) } + // If we're using a raft backend, emit raft metrics + if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { + rb.CollectMetrics(c.MetricSink()) + } + // Capture the total number of in-flight requests c.inFlightReqGaugeMetric() // Refresh gauge metrics that are looped c.cachedGaugeMetricsEmitter() - - // If we're using a raft backend, emit boltdb metrics - if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { - rb.CollectMetrics(c.MetricSink()) - } case <-writeTimer: l := newLockGrabber(c.stateLock.RLock, c.stateLock.RUnlock, stopCh) go l.grab() @@ -232,15 +232,12 @@ func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeL return ts.gaugeCollectorByTtl(ctx) } -// emitMetrics is used to start all the periodc metrics; all of them should -// be shut down when stopCh is closed. -func (c *Core) emitMetrics(stopCh chan struct{}) { +// emitMetricsActiveNode is used to start all the periodic metrics; all of them should +// be shut down when stopCh is closed. This code runs on the active node only. +func (c *Core) emitMetricsActiveNode(stopCh chan struct{}) { // The gauge collection processes are started and stopped here // because there's more than one TokenManager created during startup, // but we only want one set of gauges. - // - // Both active nodes and performance standby nodes call emitMetrics - // so we have to handle both. metricsInit := []struct { MetricName []string MetadataLabel []metrics.Label @@ -349,8 +346,8 @@ func (c *Core) findKvMounts() []*kvMount { c.mountsLock.RLock() defer c.mountsLock.RUnlock() - // emitMetrics doesn't grab the statelock, so this code might run during or after the seal process. - // Therefore, we need to check if c.mounts is nil. If we do not, emitMetrics will panic if this is + // we don't grab the statelock, so this code might run during or after the seal process. + // Therefore, we need to check if c.mounts is nil. If we do not, this will panic when // run after seal. if c.mounts == nil { return mounts diff --git a/vault/ha.go b/vault/ha.go index 4f674dde91..17b6e590d6 100644 --- a/vault/ha.go +++ b/vault/ha.go @@ -434,6 +434,17 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) { c.logger.Debug("shutting down periodic leader refresh") }) } + { + metricsStop := make(chan struct{}) + + g.Add(func() error { + c.metricsLoop(metricsStop) + return nil + }, func(error) { + close(metricsStop) + c.logger.Debug("shutting down periodic metrics") + }) + } { // Wait for leadership leaderStopCh := make(chan struct{}) diff --git a/vault/request_forwarding_rpc.go b/vault/request_forwarding_rpc.go index 6ae4cf56b7..281d9192bb 100644 --- a/vault/request_forwarding_rpc.go +++ b/vault/request_forwarding_rpc.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/vault/helper/forwarding" "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/consts" @@ -135,6 +136,9 @@ func (c *forwardingClient) startHeartbeat() { Mode: "standby", } tick := func() { + labels := make([]metrics.Label, 0, 1) + defer metrics.MeasureSinceWithLabels([]string{"ha", "rpc", "client", "echo"}, time.Now(), labels) + req := &EchoRequest{ Message: "ping", ClusterAddr: clusterAddr, @@ -149,12 +153,14 @@ func (c *forwardingClient) startHeartbeat() { req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage() req.RaftRedundancyZone = raftBackend.RedundancyZone() req.RaftUpgradeVersion = raftBackend.EffectiveVersion() + labels = append(labels, metrics.Label{Name: "peer_id", Value: raftBackend.NodeID()}) } ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second) resp, err := c.RequestForwardingClient.Echo(ctx, req) cancel() if err != nil { + metrics.IncrCounter([]string{"ha", "rpc", "client", "echo", "errors"}, 1) c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err) return } diff --git a/website/content/docs/internals/telemetry.mdx b/website/content/docs/internals/telemetry.mdx index 6633141ced..2785a42814 100644 --- a/website/content/docs/internals/telemetry.mdx +++ b/website/content/docs/internals/telemetry.mdx @@ -222,10 +222,12 @@ These metrics relate to internal operations on Merkle Trees and Write Ahead Logs These metrics are emitted on standbys when talking to the active node, and in some cases by performance standbys as well. -| Metric | Description | Unit | Type | -| :----------------------------------- | :---------------------------------------------------------------- | :----- | :------ | -| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary | -| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter | +| Metric | Description | Unit | Type | +| :----------------------------------- | :------------------------------------------------------------------- | :----- | :------ | +| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary | +| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter | +| `vault.ha.rpc.client.echo` | Time taken to send an echo request from a standby to the active node | ms | summary | +| `vault.ha.rpc.client.echo.errors` | Number of standby echo request failures | errors | counter | ## Replication Metrics @@ -474,6 +476,11 @@ These metrics relate to raft based [integrated storage][integrated-storage]. | `vault.raft_storage.bolt.spill.time` | Time taken spilling. | ms | summary | | `vault.raft_storage.bolt.write.count` | Number of writes performed. | writes | gauge | | `vault.raft_storage.bolt.write.time` | Time taken writing to disk. | ms | summary | +| `vault.raft_storage.stats.commit_index` | Index of last raft log committed to disk on this node. | sequence number | gauge | +| `vault.raft_storage.stats.applied_index` | Highest index of raft log either applied to the FSM or added to fsm_pending queue. | sequence number | gauge | +| `vault.raft_storage.stats.fsm_pending` | Number of raft logs this node has queued to be applied by the FSM. | logs | gauge | +| `vault.raft_storage.follower.applied_index_delta` | Delta between leader applied index and each follower's applied index reported by echoes. | logs | gauge | +| `vault.raft_storage.follower.last_heartbeat_ms` | Time since last echo request received by each follower. | ms | gauge | ## Integrated Storage (Raft) Autopilot