mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-03 12:07:54 +00:00
Simplify raft cluster address management in tests (#24560)
This commit is contained in:
@@ -11,10 +11,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/url"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
@@ -435,46 +433,9 @@ func RekeyCluster(t testing.T, cluster *vault.TestCluster, recovery bool) [][]by
|
|||||||
return newKeys
|
return newKeys
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestRaftServerAddressProvider is a ServerAddressProvider that uses the
|
|
||||||
// ClusterAddr() of each node to provide raft addresses.
|
|
||||||
//
|
|
||||||
// Note that TestRaftServerAddressProvider should only be used in cases where
|
|
||||||
// cores that are part of a raft configuration have already had
|
|
||||||
// startClusterListener() called (via either unsealing or raft joining).
|
|
||||||
type TestRaftServerAddressProvider struct {
|
|
||||||
Cluster *vault.TestCluster
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *TestRaftServerAddressProvider) ServerAddr(id raftlib.ServerID) (raftlib.ServerAddress, error) {
|
|
||||||
for _, core := range p.Cluster.Cores {
|
|
||||||
if core.NodeID == string(id) {
|
|
||||||
parsed, err := url.Parse(core.ClusterAddr())
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return raftlib.ServerAddress(parsed.Host), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return "", errors.New("could not find cluster addr")
|
|
||||||
}
|
|
||||||
|
|
||||||
func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
|
func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
|
||||||
addressProvider := &TestRaftServerAddressProvider{Cluster: cluster}
|
|
||||||
|
|
||||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
|
||||||
|
|
||||||
leader := cluster.Cores[0]
|
leader := cluster.Cores[0]
|
||||||
|
|
||||||
// Seal the leader so we can install an address provider
|
|
||||||
{
|
|
||||||
EnsureCoreSealed(t, leader)
|
|
||||||
leader.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
|
||||||
cluster.UnsealCore(t, leader)
|
|
||||||
vault.TestWaitActive(t, leader.Core)
|
|
||||||
}
|
|
||||||
|
|
||||||
leaderInfos := []*raft.LeaderJoinInfo{
|
leaderInfos := []*raft.LeaderJoinInfo{
|
||||||
{
|
{
|
||||||
LeaderAPIAddr: leader.Client.Address(),
|
LeaderAPIAddr: leader.Client.Address(),
|
||||||
@@ -485,7 +446,6 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
|
|||||||
// Join followers
|
// Join followers
|
||||||
for i := 1; i < len(cluster.Cores); i++ {
|
for i := 1; i < len(cluster.Cores); i++ {
|
||||||
core := cluster.Cores[i]
|
core := cluster.Cores[i]
|
||||||
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
|
||||||
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderInfos, false)
|
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderInfos, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|||||||
@@ -10,8 +10,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/vault/internalshared/configutil"
|
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
logicalKv "github.com/hashicorp/vault-plugin-secrets-kv"
|
logicalKv "github.com/hashicorp/vault-plugin-secrets-kv"
|
||||||
"github.com/hashicorp/vault/audit"
|
"github.com/hashicorp/vault/audit"
|
||||||
@@ -23,6 +21,7 @@ import (
|
|||||||
"github.com/hashicorp/vault/helper/testhelpers"
|
"github.com/hashicorp/vault/helper/testhelpers"
|
||||||
"github.com/hashicorp/vault/helper/testhelpers/corehelpers"
|
"github.com/hashicorp/vault/helper/testhelpers/corehelpers"
|
||||||
vaulthttp "github.com/hashicorp/vault/http"
|
vaulthttp "github.com/hashicorp/vault/http"
|
||||||
|
"github.com/hashicorp/vault/internalshared/configutil"
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
"github.com/hashicorp/vault/physical/raft"
|
||||||
"github.com/hashicorp/vault/sdk/logical"
|
"github.com/hashicorp/vault/sdk/logical"
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
"github.com/hashicorp/vault/sdk/physical"
|
||||||
@@ -105,7 +104,7 @@ func MakeFileBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBun
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func MakeRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, extraConf map[string]interface{}) *vault.PhysicalBackendBundle {
|
func MakeRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, extraConf map[string]interface{}, bridge *raft.ClusterAddrBridge) *vault.PhysicalBackendBundle {
|
||||||
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
||||||
raftDir, err := ioutil.TempDir("", "vault-raft-")
|
raftDir, err := ioutil.TempDir("", "vault-raft-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -118,6 +117,19 @@ func MakeRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, extraConf ma
|
|||||||
|
|
||||||
logger.Info("raft dir", "dir", raftDir)
|
logger.Info("raft dir", "dir", raftDir)
|
||||||
|
|
||||||
|
backend, err := makeRaftBackend(logger, nodeID, raftDir, extraConf, bridge)
|
||||||
|
if err != nil {
|
||||||
|
cleanupFunc()
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &vault.PhysicalBackendBundle{
|
||||||
|
Backend: backend,
|
||||||
|
Cleanup: cleanupFunc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeRaftBackend(logger hclog.Logger, nodeID, raftDir string, extraConf map[string]interface{}, bridge *raft.ClusterAddrBridge) (physical.Backend, error) {
|
||||||
conf := map[string]string{
|
conf := map[string]string{
|
||||||
"path": raftDir,
|
"path": raftDir,
|
||||||
"node_id": nodeID,
|
"node_id": nodeID,
|
||||||
@@ -134,14 +146,13 @@ func MakeRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, extraConf ma
|
|||||||
|
|
||||||
backend, err := raft.NewRaftBackend(conf, logger.Named("raft"))
|
backend, err := raft.NewRaftBackend(conf, logger.Named("raft"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanupFunc()
|
return nil, err
|
||||||
t.Fatal(err)
|
}
|
||||||
|
if bridge != nil {
|
||||||
|
backend.(*raft.RaftBackend).SetServerAddressProvider(bridge)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &vault.PhysicalBackendBundle{
|
return backend, nil
|
||||||
Backend: backend,
|
|
||||||
Cleanup: cleanupFunc,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RaftHAFactory returns a PhysicalBackendBundle with raft set as the HABackend
|
// RaftHAFactory returns a PhysicalBackendBundle with raft set as the HABackend
|
||||||
@@ -224,7 +235,14 @@ func FileBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
|||||||
|
|
||||||
func RaftBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
func RaftBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
||||||
opts.KeepStandbysSealed = true
|
opts.KeepStandbysSealed = true
|
||||||
opts.PhysicalFactory = MakeRaftBackend
|
var bridge *raft.ClusterAddrBridge
|
||||||
|
if !opts.InmemClusterLayers && opts.ClusterLayers == nil {
|
||||||
|
bridge = raft.NewClusterAddrBridge()
|
||||||
|
}
|
||||||
|
conf.ClusterAddrBridge = bridge
|
||||||
|
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger, conf map[string]interface{}) *vault.PhysicalBackendBundle {
|
||||||
|
return MakeRaftBackend(t, coreIdx, logger, conf, bridge)
|
||||||
|
}
|
||||||
opts.SetupFunc = func(t testing.T, c *vault.TestCluster) {
|
opts.SetupFunc = func(t testing.T, c *vault.TestCluster) {
|
||||||
if opts.NumCores != 1 {
|
if opts.NumCores != 1 {
|
||||||
testhelpers.RaftClusterJoinNodes(t, c)
|
testhelpers.RaftClusterJoinNodes(t, c)
|
||||||
@@ -234,7 +252,7 @@ func RaftBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RaftHASetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, bundler PhysicalBackendBundler) {
|
func RaftHASetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, bundler PhysicalBackendBundler) {
|
||||||
opts.KeepStandbysSealed = true
|
opts.InmemClusterLayers = true
|
||||||
opts.PhysicalFactory = RaftHAFactory(bundler)
|
opts.PhysicalFactory = RaftHAFactory(bundler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
|
|
||||||
hclog "github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
raftlib "github.com/hashicorp/raft"
|
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
"github.com/hashicorp/vault/physical/raft"
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
"github.com/hashicorp/vault/sdk/physical"
|
||||||
"github.com/hashicorp/vault/vault"
|
"github.com/hashicorp/vault/vault"
|
||||||
@@ -74,7 +73,7 @@ func MakeReusableStorage(t testing.T, logger hclog.Logger, bundle *vault.Physica
|
|||||||
|
|
||||||
// MakeReusableRaftStorage makes a physical raft backend that can be re-used
|
// MakeReusableRaftStorage makes a physical raft backend that can be re-used
|
||||||
// across multiple test clusters in sequence.
|
// across multiple test clusters in sequence.
|
||||||
func MakeReusableRaftStorage(t testing.T, logger hclog.Logger, numCores int, addressProvider raftlib.ServerAddressProvider) (ReusableStorage, StorageCleanup) {
|
func MakeReusableRaftStorage(t testing.T, logger hclog.Logger, numCores int) (ReusableStorage, StorageCleanup) {
|
||||||
raftDirs := make([]string, numCores)
|
raftDirs := make([]string, numCores)
|
||||||
for i := 0; i < numCores; i++ {
|
for i := 0; i < numCores; i++ {
|
||||||
raftDirs[i] = makeRaftDir(t)
|
raftDirs[i] = makeRaftDir(t)
|
||||||
@@ -87,7 +86,7 @@ func MakeReusableRaftStorage(t testing.T, logger hclog.Logger, numCores int, add
|
|||||||
conf.DisablePerformanceStandby = true
|
conf.DisablePerformanceStandby = true
|
||||||
opts.KeepStandbysSealed = true
|
opts.KeepStandbysSealed = true
|
||||||
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger, conf map[string]interface{}) *vault.PhysicalBackendBundle {
|
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger, conf map[string]interface{}) *vault.PhysicalBackendBundle {
|
||||||
return makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], addressProvider, false)
|
return makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], false)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
@@ -124,9 +123,10 @@ func MakeReusableRaftHAStorage(t testing.T, logger hclog.Logger, numCores int, b
|
|||||||
|
|
||||||
storage := ReusableStorage{
|
storage := ReusableStorage{
|
||||||
Setup: func(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
Setup: func(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
||||||
|
opts.InmemClusterLayers = true
|
||||||
opts.KeepStandbysSealed = true
|
opts.KeepStandbysSealed = true
|
||||||
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger, conf map[string]interface{}) *vault.PhysicalBackendBundle {
|
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger, conf map[string]interface{}) *vault.PhysicalBackendBundle {
|
||||||
haBundle := makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], nil, true)
|
haBundle := makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], true)
|
||||||
|
|
||||||
return &vault.PhysicalBackendBundle{
|
return &vault.PhysicalBackendBundle{
|
||||||
Backend: bundle.Backend,
|
Backend: bundle.Backend,
|
||||||
@@ -168,25 +168,13 @@ func makeRaftDir(t testing.T) string {
|
|||||||
return raftDir
|
return raftDir
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeReusableRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, raftDir string, addressProvider raftlib.ServerAddressProvider, ha bool) *vault.PhysicalBackendBundle {
|
func makeReusableRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, raftDir string, ha bool) *vault.PhysicalBackendBundle {
|
||||||
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
||||||
conf := map[string]string{
|
backend, err := makeRaftBackend(logger, nodeID, raftDir, nil, nil)
|
||||||
"path": raftDir,
|
|
||||||
"node_id": nodeID,
|
|
||||||
"performance_multiplier": "8",
|
|
||||||
"autopilot_reconcile_interval": "300ms",
|
|
||||||
"autopilot_update_interval": "100ms",
|
|
||||||
}
|
|
||||||
|
|
||||||
backend, err := raft.NewRaftBackend(conf, logger)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if addressProvider != nil {
|
|
||||||
backend.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
|
||||||
}
|
|
||||||
|
|
||||||
bundle := new(vault.PhysicalBackendBundle)
|
bundle := new(vault.PhysicalBackendBundle)
|
||||||
|
|
||||||
if ha {
|
if ha {
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -311,6 +312,33 @@ func EnsurePath(path string, dir bool) error {
|
|||||||
return os.MkdirAll(path, 0o700)
|
return os.MkdirAll(path, 0o700)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewClusterAddrBridge() *ClusterAddrBridge {
|
||||||
|
return &ClusterAddrBridge{
|
||||||
|
clusterAddressByNodeID: make(map[string]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClusterAddrBridge struct {
|
||||||
|
l sync.RWMutex
|
||||||
|
clusterAddressByNodeID map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterAddrBridge) UpdateClusterAddr(nodeId string, clusterAddr string) {
|
||||||
|
c.l.Lock()
|
||||||
|
defer c.l.Unlock()
|
||||||
|
cu, _ := url.Parse(clusterAddr)
|
||||||
|
c.clusterAddressByNodeID[nodeId] = cu.Host
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClusterAddrBridge) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
|
||||||
|
c.l.RLock()
|
||||||
|
defer c.l.RUnlock()
|
||||||
|
if addr, ok := c.clusterAddressByNodeID[string(id)]; ok {
|
||||||
|
return raft.ServerAddress(addr), nil
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("could not find cluster addr for id=%s", id)
|
||||||
|
}
|
||||||
|
|
||||||
// NewRaftBackend constructs a RaftBackend using the given directory
|
// NewRaftBackend constructs a RaftBackend using the given directory
|
||||||
func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
|
func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
|
||||||
path := os.Getenv(EnvVaultRaftPath)
|
path := os.Getenv(EnvVaultRaftPath)
|
||||||
@@ -1344,7 +1372,7 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e
|
|||||||
if b.raft == nil {
|
if b.raft == nil {
|
||||||
return errors.New("raft storage is not initialized")
|
return errors.New("raft storage is not initialized")
|
||||||
}
|
}
|
||||||
b.logger.Trace("adding server to raft", "id", peerID)
|
b.logger.Trace("adding server to raft", "id", peerID, "addr", clusterAddr)
|
||||||
future := b.raft.AddVoter(raft.ServerID(peerID), raft.ServerAddress(clusterAddr), 0, 0)
|
future := b.raft.AddVoter(raft.ServerID(peerID), raft.ServerAddress(clusterAddr), 0, 0)
|
||||||
return future.Error()
|
return future.Error()
|
||||||
}
|
}
|
||||||
@@ -1353,7 +1381,7 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e
|
|||||||
return errors.New("raft storage autopilot is not initialized")
|
return errors.New("raft storage autopilot is not initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
b.logger.Trace("adding server to raft via autopilot", "id", peerID)
|
b.logger.Trace("adding server to raft via autopilot", "id", peerID, "addr", clusterAddr)
|
||||||
return b.autopilot.AddServer(&autopilot.Server{
|
return b.autopilot.AddServer(&autopilot.Server{
|
||||||
ID: raft.ServerID(peerID),
|
ID: raft.ServerID(peerID),
|
||||||
Name: peerID,
|
Name: peerID,
|
||||||
|
|||||||
@@ -174,7 +174,7 @@ func LeaderNode(ctx context.Context, cluster VaultCluster) (int, error) {
|
|||||||
leaderActiveTimes := make(map[int]time.Time)
|
leaderActiveTimes := make(map[int]time.Time)
|
||||||
for i, node := range cluster.Nodes() {
|
for i, node := range cluster.Nodes() {
|
||||||
client := node.APIClient()
|
client := node.APIClient()
|
||||||
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
|
||||||
resp, err := client.Sys().LeaderWithContext(ctx)
|
resp, err := client.Sys().LeaderWithContext(ctx)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil || resp == nil || !resp.IsSelf {
|
if err != nil || resp == nil || !resp.IsSelf {
|
||||||
|
|||||||
@@ -340,7 +340,7 @@ func (c *Core) startClusterListener(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
if strings.HasSuffix(c.ClusterAddr(), ":0") {
|
if strings.HasSuffix(c.ClusterAddr(), ":0") {
|
||||||
// If we listened on port 0, record the port the OS gave us.
|
// If we listened on port 0, record the port the OS gave us.
|
||||||
c.clusterAddr.Store(fmt.Sprintf("https://%s", c.getClusterListener().Addr()))
|
c.SetClusterAddr(fmt.Sprintf("https://%s", c.getClusterListener().Addr()))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(c.ClusterAddr()) != 0 {
|
if len(c.ClusterAddr()) != 0 {
|
||||||
@@ -356,6 +356,15 @@ func (c *Core) ClusterAddr() string {
|
|||||||
return c.clusterAddr.Load().(string)
|
return c.clusterAddr.Load().(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Core) SetClusterAddr(s string) {
|
||||||
|
c.clusterAddr.Store(s)
|
||||||
|
rb := c.getRaftBackend()
|
||||||
|
|
||||||
|
if rb != nil && c.clusterAddrBridge != nil {
|
||||||
|
c.clusterAddrBridge.UpdateClusterAddr(c.GetRaftNodeID(), s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Core) getClusterListener() *cluster.Listener {
|
func (c *Core) getClusterListener() *cluster.Listener {
|
||||||
cl := c.clusterListener.Load()
|
cl := c.clusterListener.Load()
|
||||||
if cl == nil {
|
if cl == nil {
|
||||||
|
|||||||
@@ -710,6 +710,8 @@ type Core struct {
|
|||||||
echoDuration *uberAtomic.Duration
|
echoDuration *uberAtomic.Duration
|
||||||
activeNodeClockSkewMillis *uberAtomic.Int64
|
activeNodeClockSkewMillis *uberAtomic.Int64
|
||||||
periodicLeaderRefreshInterval time.Duration
|
periodicLeaderRefreshInterval time.Duration
|
||||||
|
|
||||||
|
clusterAddrBridge *raft.ClusterAddrBridge
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) ActiveNodeClockSkewMillis() int64 {
|
func (c *Core) ActiveNodeClockSkewMillis() int64 {
|
||||||
@@ -886,6 +888,8 @@ type CoreConfig struct {
|
|||||||
NumRollbackWorkers int
|
NumRollbackWorkers int
|
||||||
|
|
||||||
PeriodicLeaderRefreshInterval time.Duration
|
PeriodicLeaderRefreshInterval time.Duration
|
||||||
|
|
||||||
|
ClusterAddrBridge *raft.ClusterAddrBridge
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetServiceRegistration returns the config's ServiceRegistration, or nil if it does
|
// GetServiceRegistration returns the config's ServiceRegistration, or nil if it does
|
||||||
@@ -1309,6 +1313,8 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||||||
c.events = events
|
c.events = events
|
||||||
c.events.Start()
|
c.events.Start()
|
||||||
|
|
||||||
|
c.clusterAddrBridge = conf.ClusterAddrBridge
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -221,9 +222,7 @@ func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
|
|||||||
InmemCluster: true,
|
InmemCluster: true,
|
||||||
EnableAutopilot: true,
|
EnableAutopilot: true,
|
||||||
PhysicalFactoryConfig: map[string]interface{}{
|
PhysicalFactoryConfig: map[string]interface{}{
|
||||||
"snapshot_threshold": "50",
|
"trailing_logs": "10",
|
||||||
"trailing_logs": "100",
|
|
||||||
"snapshot_interval": "1s",
|
|
||||||
},
|
},
|
||||||
PerNodePhysicalFactoryConfig: map[int]map[string]interface{}{
|
PerNodePhysicalFactoryConfig: map[int]map[string]interface{}{
|
||||||
2: {
|
2: {
|
||||||
@@ -256,12 +255,12 @@ func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Wait for 110% of the stabilization time to add nodes
|
// Wait for 110% of the stabilization time to add nodes
|
||||||
stabilizationKickOffWaitDuration := time.Duration(math.Ceil(1.1 * float64(config.ServerStabilizationTime)))
|
stabilizationPadded := time.Duration(math.Ceil(1.25 * float64(config.ServerStabilizationTime)))
|
||||||
time.Sleep(stabilizationKickOffWaitDuration)
|
time.Sleep(stabilizationPadded)
|
||||||
|
|
||||||
cli := cluster.Cores[0].Client
|
cli := cluster.Cores[0].Client
|
||||||
// Write more keys than snapshot_threshold
|
// Write more keys than snapshot_threshold
|
||||||
for i := 0; i < 250; i++ {
|
for i := 0; i < 50; i++ {
|
||||||
_, err := cli.Logical().Write(fmt.Sprintf("secret/%d", i), map[string]interface{}{
|
_, err := cli.Logical().Write(fmt.Sprintf("secret/%d", i), map[string]interface{}{
|
||||||
"test": "data",
|
"test": "data",
|
||||||
})
|
})
|
||||||
@@ -270,16 +269,24 @@ func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Take a snpashot, which should compact the raft log db, which should prevent
|
||||||
|
// followers from getting logs and require that they instead apply a snapshot,
|
||||||
|
// which should allow our snapshot_delay to come into play, which should result
|
||||||
|
// in core2 coming online slower.
|
||||||
|
err = client.Sys().RaftSnapshot(io.Discard)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
joinAndUnseal(t, cluster.Cores[1], cluster, false, false)
|
joinAndUnseal(t, cluster.Cores[1], cluster, false, false)
|
||||||
joinAndUnseal(t, cluster.Cores[2], cluster, false, false)
|
joinAndUnseal(t, cluster.Cores[2], cluster, false, false)
|
||||||
|
|
||||||
core2shouldBeHealthyAt := time.Now().Add(core2SnapshotDelay).Add(config.ServerStabilizationTime)
|
// Add an extra fudge factor, since once the snapshot delay completes it can
|
||||||
|
// take time for the snapshot to actually be applied.
|
||||||
|
core2shouldBeHealthyAt := time.Now().Add(core2SnapshotDelay).Add(stabilizationPadded).Add(5 * time.Second)
|
||||||
|
|
||||||
// Wait for enough time for stabilization to complete if things were good
|
// Wait for enough time for stabilization to complete if things were good
|
||||||
// - but they're not good, due to our snapshot_delay. So we fail if both
|
// - but they're not good, due to our snapshot_delay. So we fail if both
|
||||||
// nodes are healthy.
|
// nodes are healthy.
|
||||||
stabilizationWaitDuration := time.Duration(1.25 * float64(config.ServerStabilizationTime))
|
testhelpers.RetryUntil(t, stabilizationPadded, func() error {
|
||||||
testhelpers.RetryUntil(t, stabilizationWaitDuration, func() error {
|
|
||||||
state, err := client.Sys().RaftAutopilotState()
|
state, err := client.Sys().RaftAutopilotState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -54,9 +53,6 @@ type RaftClusterOpts struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func raftClusterBuilder(t testing.TB, ropts *RaftClusterOpts) (*vault.CoreConfig, vault.TestClusterOptions) {
|
func raftClusterBuilder(t testing.TB, ropts *RaftClusterOpts) (*vault.CoreConfig, vault.TestClusterOptions) {
|
||||||
// TODO remove global
|
|
||||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
|
||||||
|
|
||||||
if ropts == nil {
|
if ropts == nil {
|
||||||
ropts = &RaftClusterOpts{
|
ropts = &RaftClusterOpts{
|
||||||
InmemCluster: true,
|
InmemCluster: true,
|
||||||
|
|||||||
@@ -4,7 +4,6 @@
|
|||||||
package raftha
|
package raftha
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
@@ -13,7 +12,6 @@ import (
|
|||||||
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
|
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
|
||||||
consulstorage "github.com/hashicorp/vault/helper/testhelpers/teststorage/consul"
|
consulstorage "github.com/hashicorp/vault/helper/testhelpers/teststorage/consul"
|
||||||
vaulthttp "github.com/hashicorp/vault/http"
|
vaulthttp "github.com/hashicorp/vault/http"
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
|
||||||
"github.com/hashicorp/vault/sdk/helper/logging"
|
"github.com/hashicorp/vault/sdk/helper/logging"
|
||||||
"github.com/hashicorp/vault/vault"
|
"github.com/hashicorp/vault/vault"
|
||||||
)
|
)
|
||||||
@@ -62,25 +60,8 @@ func testRaftHANewCluster(t *testing.T, bundler teststorage.PhysicalBackendBundl
|
|||||||
|
|
||||||
teststorage.RaftHASetup(&conf, &opts, bundler)
|
teststorage.RaftHASetup(&conf, &opts, bundler)
|
||||||
cluster := vault.NewTestCluster(t, &conf, &opts)
|
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||||
cluster.Start()
|
|
||||||
defer cluster.Cleanup()
|
defer cluster.Cleanup()
|
||||||
|
|
||||||
addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster}
|
|
||||||
|
|
||||||
leaderCore := cluster.Cores[0]
|
|
||||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
|
||||||
|
|
||||||
// Seal the leader so we can install an address provider
|
|
||||||
{
|
|
||||||
testhelpers.EnsureCoreSealed(t, leaderCore)
|
|
||||||
leaderCore.UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
|
||||||
cluster.UnsealCore(t, leaderCore)
|
|
||||||
vault.TestWaitActive(t, leaderCore.Core)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now unseal core for join commands to work
|
|
||||||
testhelpers.EnsureCoresUnsealed(t, cluster)
|
|
||||||
|
|
||||||
joinFunc := func(client *api.Client, addClientCerts bool) {
|
joinFunc := func(client *api.Client, addClientCerts bool) {
|
||||||
req := &api.RaftJoinRequest{
|
req := &api.RaftJoinRequest{
|
||||||
LeaderCACert: string(cluster.CACertPEM),
|
LeaderCACert: string(cluster.CACertPEM),
|
||||||
@@ -164,7 +145,6 @@ func TestRaft_HA_ExistingCluster(t *testing.T) {
|
|||||||
storage.Setup(&conf, &opts)
|
storage.Setup(&conf, &opts)
|
||||||
|
|
||||||
cluster := vault.NewTestCluster(t, &conf, &opts)
|
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||||
cluster.Start()
|
|
||||||
defer func() {
|
defer func() {
|
||||||
cluster.Cleanup()
|
cluster.Cleanup()
|
||||||
storage.Cleanup(t, cluster)
|
storage.Cleanup(t, cluster)
|
||||||
@@ -186,7 +166,6 @@ func TestRaft_HA_ExistingCluster(t *testing.T) {
|
|||||||
haStorage.Setup(&conf, &opts)
|
haStorage.Setup(&conf, &opts)
|
||||||
|
|
||||||
cluster := vault.NewTestCluster(t, &conf, &opts)
|
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||||
cluster.Start()
|
|
||||||
defer func() {
|
defer func() {
|
||||||
cluster.Cleanup()
|
cluster.Cleanup()
|
||||||
haStorage.Cleanup(t, cluster)
|
haStorage.Cleanup(t, cluster)
|
||||||
@@ -196,16 +175,8 @@ func TestRaft_HA_ExistingCluster(t *testing.T) {
|
|||||||
cluster.BarrierKeys = clusterBarrierKeys
|
cluster.BarrierKeys = clusterBarrierKeys
|
||||||
cluster.RootToken = clusterRootToken
|
cluster.RootToken = clusterRootToken
|
||||||
|
|
||||||
addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster}
|
|
||||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
|
||||||
|
|
||||||
// Seal the leader so we can install an address provider
|
|
||||||
leaderCore := cluster.Cores[0]
|
leaderCore := cluster.Cores[0]
|
||||||
{
|
testhelpers.EnsureCoreUnsealed(t, cluster, leaderCore)
|
||||||
testhelpers.EnsureCoreSealed(t, leaderCore)
|
|
||||||
leaderCore.UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
|
||||||
testhelpers.EnsureCoreUnsealed(t, cluster, leaderCore)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call the bootstrap on the leader and then ensure that it becomes active
|
// Call the bootstrap on the leader and then ensure that it becomes active
|
||||||
leaderClient := cluster.Cores[0].Client
|
leaderClient := cluster.Cores[0].Client
|
||||||
@@ -218,10 +189,6 @@ func TestRaft_HA_ExistingCluster(t *testing.T) {
|
|||||||
vault.TestWaitActive(t, leaderCore.Core)
|
vault.TestWaitActive(t, leaderCore.Core)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set address provider
|
|
||||||
cluster.Cores[1].UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
|
||||||
cluster.Cores[2].UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
|
||||||
|
|
||||||
// Now unseal core for join commands to work
|
// Now unseal core for join commands to work
|
||||||
testhelpers.EnsureCoresUnsealed(t, cluster)
|
testhelpers.EnsureCoresUnsealed(t, cluster)
|
||||||
|
|
||||||
|
|||||||
@@ -4,14 +4,11 @@
|
|||||||
package sealmigration
|
package sealmigration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/vault/helper/testhelpers"
|
|
||||||
"github.com/hashicorp/vault/helper/testhelpers/corehelpers"
|
"github.com/hashicorp/vault/helper/testhelpers/corehelpers"
|
||||||
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
|
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
|
||||||
"github.com/hashicorp/vault/vault"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type testFunc func(t *testing.T, logger hclog.Logger, storage teststorage.ReusableStorage, basePort int)
|
type testFunc func(t *testing.T, logger hclog.Logger, storage teststorage.ReusableStorage, basePort int)
|
||||||
@@ -36,10 +33,7 @@ func testVariousBackends(t *testing.T, tf testFunc, basePort int, includeRaft bo
|
|||||||
logger := logger.Named("raft")
|
logger := logger.Named("raft")
|
||||||
raftBasePort := basePort + 400
|
raftBasePort := basePort + 400
|
||||||
|
|
||||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
storage, cleanup := teststorage.MakeReusableRaftStorage(t, logger, numTestCores)
|
||||||
addressProvider := testhelpers.NewHardcodedServerAddressProvider(numTestCores, raftBasePort+10)
|
|
||||||
|
|
||||||
storage, cleanup := teststorage.MakeReusableRaftStorage(t, logger, numTestCores, addressProvider)
|
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
tf(t, logger, storage, raftBasePort)
|
tf(t, logger, storage, raftBasePort)
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -50,9 +50,6 @@ var (
|
|||||||
|
|
||||||
raftAutopilotConfigurationStoragePath = "core/raft/autopilot/configuration"
|
raftAutopilotConfigurationStoragePath = "core/raft/autopilot/configuration"
|
||||||
|
|
||||||
// TestingUpdateClusterAddr is used in tests to override the cluster address
|
|
||||||
TestingUpdateClusterAddr uint32
|
|
||||||
|
|
||||||
ErrJoinWithoutAutoloading = errors.New("attempt to join a cluster using autoloaded licenses while not using autoloading ourself")
|
ErrJoinWithoutAutoloading = errors.New("attempt to join a cluster using autoloaded licenses while not using autoloading ourself")
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1303,7 +1300,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess seal.Access, r
|
|||||||
return fmt.Errorf("error parsing cluster address: %w", err)
|
return fmt.Errorf("error parsing cluster address: %w", err)
|
||||||
}
|
}
|
||||||
clusterAddr := parsedClusterAddr.Host
|
clusterAddr := parsedClusterAddr.Host
|
||||||
if atomic.LoadUint32(&TestingUpdateClusterAddr) == 1 && strings.HasSuffix(clusterAddr, ":0") {
|
if c.clusterAddrBridge != nil && strings.HasSuffix(clusterAddr, ":0") {
|
||||||
// We are testing and have an address provider, so just create a random
|
// We are testing and have an address provider, so just create a random
|
||||||
// addr, it will be overwritten later.
|
// addr, it will be overwritten later.
|
||||||
var err error
|
var err error
|
||||||
|
|||||||
@@ -1508,6 +1508,7 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te
|
|||||||
coreConfig.PendingRemovalMountsAllowed = base.PendingRemovalMountsAllowed
|
coreConfig.PendingRemovalMountsAllowed = base.PendingRemovalMountsAllowed
|
||||||
coreConfig.ExpirationRevokeRetryBase = base.ExpirationRevokeRetryBase
|
coreConfig.ExpirationRevokeRetryBase = base.ExpirationRevokeRetryBase
|
||||||
coreConfig.PeriodicLeaderRefreshInterval = base.PeriodicLeaderRefreshInterval
|
coreConfig.PeriodicLeaderRefreshInterval = base.PeriodicLeaderRefreshInterval
|
||||||
|
coreConfig.ClusterAddrBridge = base.ClusterAddrBridge
|
||||||
testApplyEntBaseConfig(coreConfig, base)
|
testApplyEntBaseConfig(coreConfig, base)
|
||||||
}
|
}
|
||||||
if coreConfig.ClusterName == "" {
|
if coreConfig.ClusterName == "" {
|
||||||
@@ -1869,6 +1870,9 @@ func (testCluster *TestCluster) newCore(t testing.T, idx int, coreConfig *CoreCo
|
|||||||
localConfig.ClusterNetworkLayer = opts.ClusterLayers.Layers()[idx]
|
localConfig.ClusterNetworkLayer = opts.ClusterLayers.Layers()[idx]
|
||||||
localConfig.ClusterAddr = "https://" + localConfig.ClusterNetworkLayer.Listeners()[0].Addr().String()
|
localConfig.ClusterAddr = "https://" + localConfig.ClusterNetworkLayer.Listeners()[0].Addr().String()
|
||||||
}
|
}
|
||||||
|
if opts != nil && opts.BaseClusterListenPort != 0 {
|
||||||
|
localConfig.ClusterAddr = fmt.Sprintf("https://127.0.0.1:%d", opts.BaseClusterListenPort+idx)
|
||||||
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case localConfig.LicensingConfig != nil:
|
case localConfig.LicensingConfig != nil:
|
||||||
|
|||||||
Reference in New Issue
Block a user