adding support for four cluster docker based test scenario (#20492)

This commit is contained in:
Hamid Ghaf
2023-05-03 10:49:45 -07:00
committed by GitHub
parent a5bca6a122
commit f8ccaac856
7 changed files with 598 additions and 11 deletions

View File

@@ -5,4 +5,8 @@ const (
// a signed license string used for Vault Enterprise binary-based tests.
// The binary will be run with the env var VAULT_LICENSE set to this value.
EnvVaultLicenseCI = "VAULT_LICENSE_CI"
// DefaultCAFile is the path to the CA file. This is a docker-specific
// constant. TODO: needs to be moved to a more relevant place
DefaultCAFile = "/vault/config/ca.pem"
)

View File

@@ -49,6 +49,8 @@ var (
_ testcluster.VaultClusterNode = &DockerClusterNode{}
)
const MaxClusterNameLength = 52
// DockerCluster is used to managing the lifecycle of the test Vault cluster
type DockerCluster struct {
ClusterName string
@@ -144,13 +146,13 @@ func (dc *DockerCluster) cleanup() error {
return result.ErrorOrNil()
}
// RootToken returns the root token of the cluster, if set
func (dc *DockerCluster) RootToken() string {
// GetRootToken returns the root token of the cluster, if set
func (dc *DockerCluster) GetRootToken() string {
return dc.rootToken
}
func (dc *DockerCluster) SetRootToken(s string) {
dc.Logger.Trace("cluster root token changed", "helpful_env", fmt.Sprintf("VAULT_TOKEN=%s VAULT_CACERT=/vault/config/ca.pem", dc.RootToken()))
dc.Logger.Trace("cluster root token changed", "helpful_env", fmt.Sprintf("VAULT_TOKEN=%s VAULT_CACERT=/vault/config/ca.pem", s))
dc.rootToken = s
}
@@ -416,7 +418,7 @@ func NewTestDockerCluster(t *testing.T, opts *DockerClusterOptions) *DockerClust
if err != nil {
t.Fatal(err)
}
dc.Logger.Trace("cluster started", "helpful_env", fmt.Sprintf("VAULT_TOKEN=%s VAULT_CACERT=/vault/config/ca.pem", dc.RootToken()))
dc.Logger.Trace("cluster started", "helpful_env", fmt.Sprintf("VAULT_TOKEN=%s VAULT_CACERT=/vault/config/ca.pem", dc.GetRootToken()))
return dc
}
@@ -538,7 +540,7 @@ func (n *DockerClusterNode) newAPIClient() (*api.Client, error) {
if err != nil {
return nil, err
}
client.SetToken(n.Cluster.RootToken())
client.SetToken(n.Cluster.GetRootToken())
return client, nil
}

View File

@@ -5,6 +5,7 @@ package docker
import (
"context"
"fmt"
"os"
"strings"
"testing"
@@ -14,7 +15,12 @@ import (
"github.com/hashicorp/vault/sdk/helper/testcluster"
)
func NewReplicationSetDocker(t *testing.T) (*testcluster.ReplicationSet, error) {
type ReplicationDockerOptions struct {
NumCores int
ClusterName string
}
func NewReplicationSetDocker(t *testing.T, opt *ReplicationDockerOptions) (*testcluster.ReplicationSet, error) {
binary := os.Getenv("VAULT_BINARY")
if binary == "" {
t.Skip("only running docker test when $VAULT_BINARY present")
@@ -25,13 +31,34 @@ func NewReplicationSetDocker(t *testing.T) (*testcluster.ReplicationSet, error)
Logger: logging.NewVaultLogger(hclog.Trace).Named(t.Name()),
}
if opt == nil {
opt = &ReplicationDockerOptions{}
}
var nc int
if opt.NumCores > 0 {
nc = opt.NumCores
}
clusterName := t.Name()
if opt.ClusterName != "" {
clusterName = opt.ClusterName
}
// clusterName is used for container name as well.
// A container name should not exceed 64 chars.
// There are additional chars that are added to the name as well
// like "-A-core0". So, setting a max limit for a cluster name.
if len(clusterName) > MaxClusterNameLength {
return nil, fmt.Errorf("cluster name length exceeded the maximum allowed length of %v", MaxClusterNameLength)
}
r.Builder = func(ctx context.Context, name string, baseLogger hclog.Logger) (testcluster.VaultCluster, error) {
cluster := NewTestDockerCluster(t, &DockerClusterOptions{
ImageRepo: "hashicorp/vault",
ImageTag: "latest",
VaultBinary: os.Getenv("VAULT_BINARY"),
ClusterOptions: testcluster.ClusterOptions{
ClusterName: strings.ReplaceAll(t.Name()+"-"+name, "/", "-"),
ClusterName: strings.ReplaceAll(clusterName+"-"+name, "/", "-"),
Logger: baseLogger.Named(name),
VaultNodeConfig: &testcluster.VaultNodeConfig{
LogLevel: "TRACE",
@@ -41,6 +68,7 @@ func NewReplicationSetDocker(t *testing.T) (*testcluster.ReplicationSet, error)
// "performance_multiplier": "1",
//},
},
NumCores: nc,
},
CA: r.CA,
})

View File

@@ -318,7 +318,7 @@ func (dc *ExecDevCluster) Cleanup() {
dc.stop()
}
// RootToken returns the root token of the cluster, if set
func (dc *ExecDevCluster) RootToken() string {
// GetRootToken returns the root token of the cluster, if set
func (dc *ExecDevCluster) GetRootToken() string {
return dc.rootToken
}

View File

@@ -2,14 +2,18 @@ package testcluster
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/mitchellh/mapstructure"
)
func GetPerformanceToken(pri VaultCluster, id, secondaryPublicKey string) (string, error) {
@@ -51,7 +55,7 @@ func WaitForPerfReplicationState(ctx context.Context, cluster VaultCluster, stat
var err error
for ctx.Err() == nil {
health, err = client.Sys().HealthWithContext(ctx)
if health.ReplicationPerformanceMode == state.GetPerformanceString() {
if err == nil && health.ReplicationPerformanceMode == state.GetPerformanceString() {
return nil
}
time.Sleep(500 * time.Millisecond)
@@ -65,7 +69,7 @@ func WaitForPerfReplicationState(ctx context.Context, cluster VaultCluster, stat
func EnablePerformanceSecondaryNoWait(ctx context.Context, perfToken string, pri, sec VaultCluster, updatePrimary bool) error {
postData := map[string]interface{}{
"token": perfToken,
"ca_file": "/vault/config/ca.pem",
"ca_file": DefaultCAFile,
}
path := "sys/replication/performance/secondary/enable"
if updatePrimary {
@@ -136,6 +140,46 @@ func WaitForMatchingMerkleRoots(ctx context.Context, endpoint string, pri, sec V
return fmt.Errorf("roots did not become equal")
}
func WaitForPerformanceWAL(ctx context.Context, pri, sec VaultCluster) error {
endpoint := "sys/replication/performance/"
if err := WaitForMatchingMerkleRoots(ctx, endpoint, pri, sec); err != nil {
return nil
}
getWAL := func(mode, walKey string, cli *api.Client) (int64, error) {
status, err := cli.Logical().Read(endpoint + "status")
if err != nil {
return 0, err
}
if status == nil || status.Data == nil || status.Data["mode"] == nil {
return 0, fmt.Errorf("got nil secret or data")
}
if status.Data["mode"].(string) != mode {
return 0, fmt.Errorf("expected mode=%s, got %s", mode, status.Data["mode"].(string))
}
return status.Data[walKey].(json.Number).Int64()
}
secClient := sec.Nodes()[0].APIClient()
priClient := pri.Nodes()[0].APIClient()
for ctx.Err() == nil {
secLastRemoteWAL, err := getWAL("secondary", "last_remote_wal", secClient)
if err != nil {
return err
}
priLastPerfWAL, err := getWAL("primary", "last_performance_wal", priClient)
if err != nil {
return err
}
if secLastRemoteWAL >= priLastPerfWAL {
return nil
}
time.Sleep(time.Second)
}
return fmt.Errorf("performance WALs on the secondary did not catch up with the primary, context err: %w", ctx.Err())
}
func WaitForPerformanceSecondary(ctx context.Context, pri, sec VaultCluster, skipPoisonPill bool) (string, error) {
if len(pri.GetRecoveryKeys()) > 0 {
sec.SetBarrierKeys(pri.GetRecoveryKeys())
@@ -233,6 +277,469 @@ func SetupTwoClusterPerfReplication(ctx context.Context, pri, sec VaultCluster)
return err
}
// PassiveWaitForActiveNodeAndPerfStandbys should be used instead of
// WaitForActiveNodeAndPerfStandbys when you don't want to do any writes
// as a side-effect. This returns perfStandby nodes in the cluster and
// an error.
func PassiveWaitForActiveNodeAndPerfStandbys(ctx context.Context, pri VaultCluster) (VaultClusterNode, []VaultClusterNode, error) {
leaderNode, standbys, err := GetActiveAndStandbys(ctx, pri)
if err != nil {
return nil, nil, fmt.Errorf("failed to derive standby nodes, %w", err)
}
for i, node := range standbys {
client := node.APIClient()
// Make sure we get perf standby nodes
if err = EnsureCoreIsPerfStandby(ctx, client); err != nil {
return nil, nil, fmt.Errorf("standby node %d is not a perfStandby, %w", i, err)
}
}
return leaderNode, standbys, nil
}
func GetActiveAndStandbys(ctx context.Context, cluster VaultCluster) (VaultClusterNode, []VaultClusterNode, error) {
var leaderIndex int
var err error
if leaderIndex, err = WaitForActiveNode(ctx, cluster); err != nil {
return nil, nil, err
}
var leaderNode VaultClusterNode
var nodes []VaultClusterNode
for i, node := range cluster.Nodes() {
if i == leaderIndex {
leaderNode = node
continue
}
nodes = append(nodes, node)
}
return leaderNode, nodes, nil
}
func EnsureCoreIsPerfStandby(ctx context.Context, client *api.Client) error {
var err error
var health *api.HealthResponse
for ctx.Err() == nil {
health, err = client.Sys().HealthWithContext(ctx)
if err == nil && health.PerformanceStandby {
return nil
}
time.Sleep(time.Millisecond * 500)
}
if err == nil {
err = ctx.Err()
}
return err
}
func WaitForDRReplicationState(ctx context.Context, cluster VaultCluster, state consts.ReplicationState) error {
client := cluster.Nodes()[0].APIClient()
var health *api.HealthResponse
var err error
for ctx.Err() == nil {
health, err = client.Sys().HealthWithContext(ctx)
if err == nil && health.ReplicationDRMode == state.GetDRString() {
return nil
}
time.Sleep(500 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return err
}
func EnableDrPrimary(ctx context.Context, pri VaultCluster) error {
client := pri.Nodes()[0].APIClient()
_, err := client.Logical().Write("sys/replication/dr/primary/enable", nil)
if err != nil {
return err
}
err = WaitForDRReplicationState(ctx, pri, consts.ReplicationDRPrimary)
if err != nil {
return err
}
return WaitForActiveNodeAndPerfStandbys(ctx, pri)
}
func GenerateDRActivationToken(pri VaultCluster, id, secondaryPublicKey string) (string, error) {
client := pri.Nodes()[0].APIClient()
req := map[string]interface{}{
"id": id,
}
if secondaryPublicKey != "" {
req["secondary_public_key"] = secondaryPublicKey
}
secret, err := client.Logical().Write("sys/replication/dr/primary/secondary-token", req)
if err != nil {
return "", err
}
if secondaryPublicKey != "" {
return secret.Data["token"].(string), nil
}
return secret.WrapInfo.Token, nil
}
func WaitForDRSecondary(ctx context.Context, pri, sec VaultCluster, skipPoisonPill bool) error {
if len(pri.GetRecoveryKeys()) > 0 {
sec.SetBarrierKeys(pri.GetRecoveryKeys())
sec.SetRecoveryKeys(pri.GetRecoveryKeys())
} else {
sec.SetBarrierKeys(pri.GetBarrierKeys())
sec.SetRecoveryKeys(pri.GetBarrierKeys())
}
if len(sec.Nodes()) > 1 {
if skipPoisonPill {
// As part of prepareSecondary on the active node the keyring is
// deleted from storage. Its absence can cause standbys to seal
// themselves. But it's not reliable, so we'll seal them
// ourselves to force the issue.
for i := range sec.Nodes()[1:] {
if err := SealNode(ctx, sec, i+1); err != nil {
return err
}
}
} else {
// We want to make sure we unseal all the nodes so we first need to wait
// until two of the nodes seal due to the poison pill being written
if err := WaitForNCoresSealed(ctx, sec, len(sec.Nodes())-1); err != nil {
return err
}
}
}
if _, err := WaitForActiveNode(ctx, sec); err != nil {
return err
}
// unseal nodes
for i := range sec.Nodes() {
if err := UnsealNode(ctx, sec, i); err != nil {
// Sometimes when we get here it's already unsealed on its own
// and then this fails for DR secondaries so check again
// The error is "path disabled in replication DR secondary mode".
if healthErr := NodeHealthy(ctx, sec, i); healthErr != nil {
// return the original error
return err
}
}
}
sec.SetRootToken(pri.GetRootToken())
if _, err := WaitForActiveNode(ctx, sec); err != nil {
return err
}
return nil
}
func EnableDRSecondaryNoWait(ctx context.Context, sec VaultCluster, drToken string) error {
postData := map[string]interface{}{
"token": drToken,
"ca_file": DefaultCAFile,
}
_, err := sec.Nodes()[0].APIClient().Logical().Write("sys/replication/dr/secondary/enable", postData)
if err != nil {
return err
}
return WaitForDRReplicationState(ctx, sec, consts.ReplicationDRSecondary)
}
func WaitForReplicationStatus(ctx context.Context, client *api.Client, dr bool, accept func(map[string]interface{}) bool) error {
url := "sys/replication/performance/status"
if dr {
url = "sys/replication/dr/status"
}
var err error
var secret *api.Secret
for ctx.Err() == nil {
secret, err = client.Logical().Read(url)
if err == nil && secret != nil && secret.Data != nil {
if accept(secret.Data) {
return nil
}
}
time.Sleep(500 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return fmt.Errorf("unable to get acceptable replication status: error=%v secret=%#v", err, secret)
}
func WaitForDRReplicationWorking(ctx context.Context, pri, sec VaultCluster) error {
priClient := pri.Nodes()[0].APIClient()
secClient := sec.Nodes()[0].APIClient()
// Make sure we've entered stream-wals mode
err := WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) bool {
return secret["state"] == string("stream-wals")
})
if err != nil {
return err
}
// Now write some data and make sure that we see last_remote_wal nonzero, i.e.
// at least one WAL has been streamed.
secret, err := priClient.Auth().Token().Create(&api.TokenCreateRequest{})
if err != nil {
return err
}
// Revoke the token since some tests won't be happy to see it.
err = priClient.Auth().Token().RevokeTree(secret.Auth.ClientToken)
if err != nil {
return err
}
err = WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) bool {
if secret["state"] != string("stream-wals") {
return false
}
if secret["last_remote_wal"] != nil {
lastRemoteWal, _ := secret["last_remote_wal"].(json.Number).Int64()
return lastRemoteWal > 0
}
return false
})
if err != nil {
return err
}
return nil
}
func EnableDrSecondary(ctx context.Context, pri, sec VaultCluster, drToken string) error {
err := EnableDRSecondaryNoWait(ctx, sec, drToken)
if err != nil {
return err
}
if err = WaitForMatchingMerkleRoots(ctx, "sys/replication/dr/", pri, sec); err != nil {
return err
}
err = WaitForDRSecondary(ctx, pri, sec, false)
if err != nil {
return err
}
if err = WaitForDRReplicationWorking(ctx, pri, sec); err != nil {
return err
}
return nil
}
func SetupTwoClusterDRReplication(ctx context.Context, pri, sec VaultCluster) error {
if err := EnableDrPrimary(ctx, pri); err != nil {
return err
}
drToken, err := GenerateDRActivationToken(pri, sec.ClusterID(), "")
if err != nil {
return err
}
err = EnableDrSecondary(ctx, pri, sec, drToken)
if err != nil {
return err
}
return nil
}
func DemoteDRPrimary(client *api.Client) error {
_, err := client.Logical().Write("sys/replication/dr/primary/demote", map[string]interface{}{})
return err
}
func createBatchToken(client *api.Client, path string) (string, error) {
// TODO: should these be more random in case more than one batch token needs to be created?
suffix := strings.Replace(path, "/", "", -1)
policyName := "path-batch-policy-" + suffix
roleName := "path-batch-role-" + suffix
rules := fmt.Sprintf(`path "%s" { capabilities = [ "read", "update" ] }`, path)
// create policy
_, err := client.Logical().Write("sys/policy/"+policyName, map[string]interface{}{
"policy": rules,
})
if err != nil {
return "", err
}
// create a role
_, err = client.Logical().Write("auth/token/roles/"+roleName, map[string]interface{}{
"allowed_policies": policyName,
"orphan": true,
"renewable": false,
"token_type": "batch",
})
if err != nil {
return "", err
}
// create batch token
secret, err := client.Logical().Write("auth/token/create/"+roleName, nil)
if err != nil {
return "", err
}
return secret.Auth.ClientToken, nil
}
// PromoteDRSecondaryWithBatchToken creates a batch token for DR promotion
// before promotion, it demotes the primary cluster. The primary cluster needs
// to be functional for the generation of the batch token
func PromoteDRSecondaryWithBatchToken(ctx context.Context, pri, sec VaultCluster) error {
client := pri.Nodes()[0].APIClient()
drToken, err := createBatchToken(client, "sys/replication/dr/secondary/promote")
if err != nil {
return err
}
err = DemoteDRPrimary(client)
if err != nil {
return err
}
return promoteDRSecondaryInternal(ctx, sec, drToken)
}
// PromoteDRSecondary generates a DR operation token on the secondary using
// unseal/recovery keys. Therefore, the primary cluster could potentially
// be out of service.
func PromoteDRSecondary(ctx context.Context, sec VaultCluster) error {
// generate DR operation token to do update primary on vC to point to
// the new perfSec primary vD
drToken, err := GenerateRoot(sec, GenerateRootDR)
if err != nil {
return err
}
return promoteDRSecondaryInternal(ctx, sec, drToken)
}
func promoteDRSecondaryInternal(ctx context.Context, sec VaultCluster, drToken string) error {
secClient := sec.Nodes()[0].APIClient()
// Allow retries of 503s, e.g.: replication is still catching up,
// try again later or provide the "force" argument
oldMaxRetries := secClient.MaxRetries()
secClient.SetMaxRetries(10)
defer secClient.SetMaxRetries(oldMaxRetries)
resp, err := secClient.Logical().Write("sys/replication/dr/secondary/promote", map[string]interface{}{
"dr_operation_token": drToken,
})
if err != nil {
return err
}
if resp == nil {
return fmt.Errorf("nil status response during DR promotion")
}
if _, err := WaitForActiveNode(ctx, sec); err != nil {
return err
}
return WaitForDRReplicationState(ctx, sec, consts.ReplicationDRPrimary)
}
func checkClusterAddr(ctx context.Context, pri, sec VaultCluster) error {
priClient := pri.Nodes()[0].APIClient()
priLeader, err := priClient.Sys().LeaderWithContext(ctx)
if err != nil {
return err
}
secClient := sec.Nodes()[0].APIClient()
endpoint := "sys/replication/dr/"
status, err := secClient.Logical().Read(endpoint + "status")
if err != nil {
return err
}
if status == nil || status.Data == nil {
return fmt.Errorf("got nil secret or data")
}
var priAddrs []string
err = mapstructure.Decode(status.Data["known_primary_cluster_addrs"], &priAddrs)
if err != nil {
return err
}
if !strutil.StrListContains(priAddrs, priLeader.LeaderClusterAddress) {
return fmt.Errorf("failed to fine the expected primary cluster address %v in known_primary_cluster_addrs", priLeader.LeaderClusterAddress)
}
return nil
}
func UpdatePrimary(ctx context.Context, pri, sec VaultCluster) error {
// generate DR operation token to do update primary on vC to point to
// the new perfSec primary vD
rootToken, err := GenerateRoot(sec, GenerateRootDR)
if err != nil {
return err
}
// secondary activation token
drToken, err := GenerateDRActivationToken(pri, sec.ClusterID(), "")
if err != nil {
return err
}
// update-primary on vC (new perfSec Dr secondary) to point to
// the new perfSec Dr primary
secClient := sec.Nodes()[0].APIClient()
resp, err := secClient.Logical().Write("sys/replication/dr/secondary/update-primary", map[string]interface{}{
"dr_operation_token": rootToken,
"token": drToken,
"ca_file": DefaultCAFile,
})
if err != nil {
return err
}
if resp == nil {
return fmt.Errorf("nil status response during update primary")
}
if _, err = WaitForActiveNode(ctx, sec); err != nil {
return err
}
if err = WaitForDRReplicationState(ctx, sec, consts.ReplicationDRSecondary); err != nil {
return err
}
if err = checkClusterAddr(ctx, pri, sec); err != nil {
return err
}
return nil
}
func SetupFourClusterReplication(ctx context.Context, pri, sec, pridr, secdr VaultCluster) error {
err := SetupTwoClusterPerfReplication(ctx, pri, sec)
if err != nil {
return err
}
err = SetupTwoClusterDRReplication(ctx, pri, pridr)
if err != nil {
return err
}
err = SetupTwoClusterDRReplication(ctx, sec, secdr)
if err != nil {
return err
}
return nil
}
type ReplicationSet struct {
// By convention, we recommend the following naming scheme for
// clusters in this map:
@@ -286,6 +793,47 @@ func (r *ReplicationSet) StandardPerfReplication(ctx context.Context) error {
return nil
}
func (r *ReplicationSet) StandardDRReplication(ctx context.Context) error {
for _, name := range []string{"A", "B"} {
if _, ok := r.Clusters[name]; !ok {
cluster, err := r.Builder(ctx, name, r.Logger)
if err != nil {
return err
}
r.Clusters[name] = cluster
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err := SetupTwoClusterDRReplication(ctx, r.Clusters["A"], r.Clusters["B"])
if err != nil {
return err
}
return nil
}
func (r *ReplicationSet) GetFourReplicationCluster(ctx context.Context) error {
for _, name := range []string{"A", "B", "C", "D"} {
if _, ok := r.Clusters[name]; !ok {
cluster, err := r.Builder(ctx, name, r.Logger)
if err != nil {
return err
}
r.Clusters[name] = cluster
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err := SetupFourClusterReplication(ctx, r.Clusters["A"], r.Clusters["C"], r.Clusters["B"], r.Clusters["D"])
if err != nil {
return err
}
return nil
}
func (r *ReplicationSet) Cleanup() {
for _, cluster := range r.Clusters {
cluster.Cleanup()

View File

@@ -30,6 +30,7 @@ type VaultCluster interface {
ClusterID() string
NamedLogger(string) hclog.Logger
SetRootToken(token string)
GetRootToken() string
}
type VaultNodeConfig struct {

View File

@@ -2263,6 +2263,10 @@ func (c *TestCluster) NamedLogger(name string) log.Logger {
return c.Logger.Named(name)
}
func (c *TestCluster) GetRootToken() string {
return c.RootToken
}
func (c *TestClusterCore) Name() string {
return c.NodeID
}