From f0292fa2e4587b2f8a5fa71a58d517030f024e62 Mon Sep 17 00:00:00 2001 From: Hamid Ghaf <83242695+hghaf099@users.noreply.github.com> Date: Mon, 5 Jun 2023 14:39:40 -0700 Subject: [PATCH] OSS part of replication sys method (#20995) * OSS part of replication sys method * CL --- api/replication_status.go | 130 ++++++++++++++++++++++++++ changelog/20995.txt | 3 + sdk/helper/testcluster/replication.go | 33 ++++--- 3 files changed, 154 insertions(+), 12 deletions(-) create mode 100644 api/replication_status.go create mode 100644 changelog/20995.txt diff --git a/api/replication_status.go b/api/replication_status.go new file mode 100644 index 0000000000..1668daf19c --- /dev/null +++ b/api/replication_status.go @@ -0,0 +1,130 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package api + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/mitchellh/mapstructure" +) + +const ( + apiRepPerformanceStatusPath = "/v1/sys/replication/performance/status" + apiRepDRStatusPath = "/v1/sys/replication/dr/status" + apiRepStatusPath = "/v1/sys/replication/status" +) + +type ClusterInfo struct { + APIAddr string `json:"api_address,omitempty" mapstructure:"api_address"` + ClusterAddress string `json:"cluster_address,omitempty" mapstructure:"cluster_address"` + ConnectionStatus string `json:"connection_status,omitempty" mapstructure:"connection_status"` + LastHeartBeat string `json:"last_heartbeat,omitempty" mapstructure:"last_heartbeat"` + NodeID string `json:"node_id,omitempty" mapstructure:"node_id"` +} + +type ReplicationStatusGenericResponse struct { + LastDRWAL uint64 `json:"last_dr_wal,omitempty" mapstructure:"last_dr_wal"` + LastReindexEpoch string `json:"last_reindex_epoch,omitempty" mapstructure:"last_reindex_epoch"` + ClusterID string `json:"cluster_id,omitempty" mapstructure:"cluster_id"` + LastWAL uint64 `json:"last_wal,omitempty" mapstructure:"last_wal"` + MerkleRoot string `json:"merkle_root,omitempty" mapstructure:"merkle_root"` + Mode string `json:"mode,omitempty" mapstructure:"mode"` + PrimaryClusterAddr string `json:"primary_cluster_addr,omitempty" mapstructure:"primary_cluster_addr"` + LastPerformanceWAL uint64 `json:"last_performance_wal,omitempty" mapstructure:"last_performance_wal"` + State string `json:"state,omitempty" mapstructure:"state"` + LastRemoteWAL uint64 `json:"last_remote_wal,omitempty" mapstructure:"last_remote_wal"` + SecondaryID string `json:"secondary_id,omitempty" mapstructure:"secondary_id"` + SSCTGenerationCounter uint64 `json:"ssct_generation_counter,omitempty" mapstructure:"ssct_generation_counter"` + + KnownSecondaries []string `json:"known_secondaries,omitempty" mapstructure:"known_secondaries"` + KnownPrimaryClusterAddrs []string `json:"known_primary_cluster_addrs,omitempty" mapstructure:"known_primary_cluster_addrs"` + Primaries []ClusterInfo `json:"primaries,omitempty" mapstructure:"primaries"` + Secondaries []ClusterInfo `json:"secondaries,omitempty" mapstructure:"secondaries"` +} + +type ReplicationStatusResponse struct { + DR ReplicationStatusGenericResponse `json:"dr,omitempty" mapstructure:"dr"` + Performance ReplicationStatusGenericResponse `json:"performance,omitempty" mapstructure:"performance"` +} + +func (c *Sys) ReplicationStatus() (*ReplicationStatusResponse, error) { + return c.ReplicationStatusWithContext(context.Background(), apiRepStatusPath) +} + +func (c *Sys) ReplicationPerformanceStatusWithContext(ctx context.Context) (*ReplicationStatusGenericResponse, error) { + s, err := c.ReplicationStatusWithContext(ctx, apiRepPerformanceStatusPath) + if err != nil { + return nil, err + } + + return &s.Performance, nil +} + +func (c *Sys) ReplicationDRStatusWithContext(ctx context.Context) (*ReplicationStatusGenericResponse, error) { + s, err := c.ReplicationStatusWithContext(ctx, apiRepDRStatusPath) + if err != nil { + return nil, err + } + + return &s.DR, nil +} + +func (c *Sys) ReplicationStatusWithContext(ctx context.Context, path string) (*ReplicationStatusResponse, error) { + // default to replication/status + if path == "" { + path = apiRepStatusPath + } + + ctx, cancelFunc := c.c.withConfiguredTimeout(ctx) + defer cancelFunc() + + r := c.c.NewRequest(http.MethodGet, path) + + resp, err := c.c.rawRequestWithContext(ctx, r) + if err != nil { + return nil, err + } + defer func() { _ = resp.Body.Close() }() + + // First decode response into a map[string]interface{} + data := make(map[string]interface{}) + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + if err := dec.Decode(&data); err != nil { + return nil, err + } + + rawData, ok := data["data"] + if !ok { + return nil, fmt.Errorf("empty data in replication status response") + } + + s := &ReplicationStatusResponse{} + g := &ReplicationStatusGenericResponse{} + switch { + case path == apiRepPerformanceStatusPath: + err = mapstructure.Decode(rawData, g) + if err != nil { + return nil, err + } + s.Performance = *g + case path == apiRepDRStatusPath: + err = mapstructure.Decode(rawData, g) + if err != nil { + return nil, err + } + s.DR = *g + default: + err = mapstructure.Decode(rawData, s) + if err != nil { + return nil, err + } + return s, err + } + + return s, err +} diff --git a/changelog/20995.txt b/changelog/20995.txt new file mode 100644 index 0000000000..76653d4d54 --- /dev/null +++ b/changelog/20995.txt @@ -0,0 +1,3 @@ +```release-note:improvement +api: adding a new api sys method for replication status +``` diff --git a/sdk/helper/testcluster/replication.go b/sdk/helper/testcluster/replication.go index 72c8bc67c3..46356deff1 100644 --- a/sdk/helper/testcluster/replication.go +++ b/sdk/helper/testcluster/replication.go @@ -464,7 +464,7 @@ func EnableDRSecondaryNoWait(ctx context.Context, sec VaultCluster, drToken stri return WaitForDRReplicationState(ctx, sec, consts.ReplicationDRSecondary) } -func WaitForReplicationStatus(ctx context.Context, client *api.Client, dr bool, accept func(map[string]interface{}) bool) error { +func WaitForReplicationStatus(ctx context.Context, client *api.Client, dr bool, accept func(map[string]interface{}) error) error { url := "sys/replication/performance/status" if dr { url = "sys/replication/dr/status" @@ -475,7 +475,7 @@ func WaitForReplicationStatus(ctx context.Context, client *api.Client, dr bool, for ctx.Err() == nil { secret, err = client.Logical().Read(url) if err == nil && secret != nil && secret.Data != nil { - if accept(secret.Data) { + if err = accept(secret.Data); err == nil { return nil } } @@ -493,8 +493,12 @@ func WaitForDRReplicationWorking(ctx context.Context, pri, sec VaultCluster) err secClient := sec.Nodes()[0].APIClient() // Make sure we've entered stream-wals mode - err := WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) bool { - return secret["state"] == string("stream-wals") + err := WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) error { + state := secret["state"] + if state == string("stream-wals") { + return nil + } + return fmt.Errorf("expected stream-wals replication state, got %v", state) }) if err != nil { return err @@ -513,16 +517,21 @@ func WaitForDRReplicationWorking(ctx context.Context, pri, sec VaultCluster) err return err } - err = WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) bool { - if secret["state"] != string("stream-wals") { - return false - } - if secret["last_remote_wal"] != nil { - lastRemoteWal, _ := secret["last_remote_wal"].(json.Number).Int64() - return lastRemoteWal > 0 + err = WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) error { + state := secret["state"] + if state != string("stream-wals") { + return fmt.Errorf("expected stream-wals replication state, got %v", state) } - return false + if secret["last_remote_wal"] != nil { + lastRemoteWal, _ := secret["last_remote_wal"].(json.Number).Int64() + if lastRemoteWal <= 0 { + return fmt.Errorf("expected last_remote_wal to be greater than zero") + } + return nil + } + + return fmt.Errorf("replication seems to be still catching up, maybe need to wait more") }) if err != nil { return err