mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-01 19:17:58 +00:00
raft: add support for using backend for ha_storage (#9193)
* raft: initial work on raft ha storage support * add note on join * add todo note * raft: add support for bootstrapping and joining existing nodes * raft: gate bootstrap join by reading leader api address from storage * raft: properly check for raft-only for certain conditionals * raft: add bootstrap to api and cli * raft: fix bootstrap cli command * raft: add test for setting up new cluster with raft HA * raft: extend TestRaft_HA_NewCluster to include inmem and consul backends * raft: add test for updating an existing cluster to use raft HA * raft: remove debug log lines, clean up verifyRaftPeers * raft: minor cleanup * raft: minor cleanup * Update physical/raft/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/ha.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/ha.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/logical_system_raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * address feedback comments * address feedback comments * raft: refactor tls keyring logic * address feedback comments * Update vault/raft.go Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * Update vault/raft.go Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * address feedback comments * testing: fix import ordering * raft: rename var, cleanup comment line * docs: remove ha_storage restriction note on raft * docs: more raft HA interaction updates with migration and recovery mode * docs: update the raft join command * raft: update comments * raft: add missing isRaftHAOnly check for clearing out state set earlier * raft: update a few ha_storage config checks * Update command/operator_raft_bootstrap.go Co-authored-by: Vishal Nayak <vishalnayak@users.noreply.github.com> * raft: address feedback comments * raft: fix panic when checking for config.HAStorage.Type * Update vault/raft.go Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * Update website/pages/docs/commands/operator/raft.mdx Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * raft: remove bootstrap cli command * Update vault/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * raft: address review feedback * raft: revert vendored sdk * raft: don't send applied index and node ID info if we're HA-only Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> Co-authored-by: Vishal Nayak <vishalnayak@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
fb0924dee3
commit
045836da71
@@ -153,7 +153,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
|
|||||||
|
|
||||||
if c.flagReset {
|
if c.flagReset {
|
||||||
if err := SetStorageMigration(from, false); err != nil {
|
if err := SetStorageMigration(from, false); err != nil {
|
||||||
return errwrap.Wrapf("error reseting migration lock: {{err}}", err)
|
return errwrap.Wrapf("error resetting migration lock: {{err}}", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -169,7 +169,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if migrationStatus != nil {
|
if migrationStatus != nil {
|
||||||
return fmt.Errorf("Storage migration in progress (started: %s).", migrationStatus.Start.Format(time.RFC3339))
|
return fmt.Errorf("storage migration in progress (started: %s)", migrationStatus.Start.Format(time.RFC3339))
|
||||||
}
|
}
|
||||||
|
|
||||||
switch config.StorageSource.Type {
|
switch config.StorageSource.Type {
|
||||||
@@ -259,7 +259,7 @@ func (c *OperatorMigrateCommand) createDestinationBackend(kind string, conf map[
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
||||||
}
|
}
|
||||||
if err := raftStorage.Bootstrap(context.Background(), []raft.Peer{
|
if err := raftStorage.Bootstrap([]raft.Peer{
|
||||||
{
|
{
|
||||||
ID: raftStorage.NodeID(),
|
ID: raftStorage.NodeID(),
|
||||||
Address: parsedClusterAddr.Host,
|
Address: parsedClusterAddr.Host,
|
||||||
|
|||||||
@@ -110,15 +110,12 @@ func (c *OperatorRaftJoinCommand) Run(args []string) int {
|
|||||||
|
|
||||||
args = f.Args()
|
args = f.Args()
|
||||||
switch len(args) {
|
switch len(args) {
|
||||||
|
case 0:
|
||||||
|
// No-op: This is acceptable if we're using raft for HA-only
|
||||||
case 1:
|
case 1:
|
||||||
leaderAPIAddr = strings.TrimSpace(args[0])
|
leaderAPIAddr = strings.TrimSpace(args[0])
|
||||||
default:
|
default:
|
||||||
c.UI.Error(fmt.Sprintf("Incorrect arguments (expected 1, got %d)", len(args)))
|
c.UI.Error(fmt.Sprintf("Too many arguments (expected 0-1, got %d)", len(args)))
|
||||||
return 1
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(leaderAPIAddr) == 0 {
|
|
||||||
c.UI.Error("leader api address is required")
|
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -64,7 +64,14 @@ var enableFourClusterDev = func(c *ServerCommand, base *vault.CoreConfig, info m
|
|||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
const storageMigrationLock = "core/migration"
|
const (
|
||||||
|
storageMigrationLock = "core/migration"
|
||||||
|
|
||||||
|
// Even though there are more types than the ones below, the following consts
|
||||||
|
// are declared internally for value comparison and reusability.
|
||||||
|
storageTypeRaft = "raft"
|
||||||
|
storageTypeConsul = "consul"
|
||||||
|
)
|
||||||
|
|
||||||
type ServerCommand struct {
|
type ServerCommand struct {
|
||||||
*BaseCommand
|
*BaseCommand
|
||||||
@@ -453,7 +460,7 @@ func (c *ServerCommand) runRecoveryMode() int {
|
|||||||
c.UI.Error(fmt.Sprintf("Unknown storage type %s", config.Storage.Type))
|
c.UI.Error(fmt.Sprintf("Unknown storage type %s", config.Storage.Type))
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
if config.Storage.Type == "raft" {
|
if config.Storage.Type == storageTypeRaft || (config.HAStorage != nil && config.HAStorage.Type == storageTypeRaft) {
|
||||||
if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" {
|
if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" {
|
||||||
config.ClusterAddr = envCA
|
config.ClusterAddr = envCA
|
||||||
}
|
}
|
||||||
@@ -963,7 +970,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||||||
|
|
||||||
// Do any custom configuration needed per backend
|
// Do any custom configuration needed per backend
|
||||||
switch config.Storage.Type {
|
switch config.Storage.Type {
|
||||||
case "consul":
|
case storageTypeConsul:
|
||||||
if config.ServiceRegistration == nil {
|
if config.ServiceRegistration == nil {
|
||||||
// If Consul is configured for storage and service registration is unconfigured,
|
// If Consul is configured for storage and service registration is unconfigured,
|
||||||
// use Consul for service registration without requiring additional configuration.
|
// use Consul for service registration without requiring additional configuration.
|
||||||
@@ -973,7 +980,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||||||
Config: config.Storage.Config,
|
Config: config.Storage.Config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case "raft":
|
case storageTypeRaft:
|
||||||
if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" {
|
if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" {
|
||||||
config.ClusterAddr = envCA
|
config.ClusterAddr = envCA
|
||||||
}
|
}
|
||||||
@@ -1145,6 +1152,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||||||
SecureRandomReader: secureRandomReader,
|
SecureRandomReader: secureRandomReader,
|
||||||
}
|
}
|
||||||
if c.flagDev {
|
if c.flagDev {
|
||||||
|
coreConfig.EnableRaw = true
|
||||||
coreConfig.DevToken = c.flagDevRootTokenID
|
coreConfig.DevToken = c.flagDevRootTokenID
|
||||||
if c.flagDevLeasedKV {
|
if c.flagDevLeasedKV {
|
||||||
coreConfig.LogicalBackends["kv"] = vault.LeasedPassthroughBackendFactory
|
coreConfig.LogicalBackends["kv"] = vault.LeasedPassthroughBackendFactory
|
||||||
@@ -1175,24 +1183,26 @@ func (c *ServerCommand) Run(args []string) int {
|
|||||||
// Initialize the separate HA storage backend, if it exists
|
// Initialize the separate HA storage backend, if it exists
|
||||||
var ok bool
|
var ok bool
|
||||||
if config.HAStorage != nil {
|
if config.HAStorage != nil {
|
||||||
if config.Storage.Type == "raft" {
|
if config.Storage.Type == storageTypeRaft && config.HAStorage.Type == storageTypeRaft {
|
||||||
|
c.UI.Error("Raft cannot be set both as 'storage' and 'ha_storage'. Setting 'storage' to 'raft' will automatically set it up for HA operations as well")
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Storage.Type == storageTypeRaft {
|
||||||
c.UI.Error("HA storage cannot be declared when Raft is the storage type")
|
c.UI.Error("HA storage cannot be declared when Raft is the storage type")
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Remove when Raft can server as the ha_storage backend.
|
|
||||||
// See https://github.com/hashicorp/vault/issues/8206
|
|
||||||
if config.HAStorage.Type == "raft" {
|
|
||||||
c.UI.Error("Raft cannot be used as separate HA storage at this time")
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
factory, exists := c.PhysicalBackends[config.HAStorage.Type]
|
factory, exists := c.PhysicalBackends[config.HAStorage.Type]
|
||||||
if !exists {
|
if !exists {
|
||||||
c.UI.Error(fmt.Sprintf("Unknown HA storage type %s", config.HAStorage.Type))
|
c.UI.Error(fmt.Sprintf("Unknown HA storage type %s", config.HAStorage.Type))
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
}
|
}
|
||||||
habackend, err := factory(config.HAStorage.Config, c.logger)
|
|
||||||
|
namedHALogger := c.logger.Named("ha." + config.HAStorage.Type)
|
||||||
|
allLoggers = append(allLoggers, namedHALogger)
|
||||||
|
habackend, err := factory(config.HAStorage.Config, namedHALogger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.UI.Error(fmt.Sprintf(
|
c.UI.Error(fmt.Sprintf(
|
||||||
"Error initializing HA storage of type %s: %s", config.HAStorage.Type, err))
|
"Error initializing HA storage of type %s: %s", config.HAStorage.Type, err))
|
||||||
@@ -1211,10 +1221,13 @@ func (c *ServerCommand) Run(args []string) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
coreConfig.RedirectAddr = config.HAStorage.RedirectAddr
|
coreConfig.RedirectAddr = config.HAStorage.RedirectAddr
|
||||||
|
|
||||||
// TODO: Check for raft and disableClustering case when Raft on HA
|
|
||||||
// Storage support is added.
|
|
||||||
disableClustering = config.HAStorage.DisableClustering
|
disableClustering = config.HAStorage.DisableClustering
|
||||||
|
|
||||||
|
if config.HAStorage.Type == storageTypeRaft && disableClustering {
|
||||||
|
c.UI.Error("Disable clustering cannot be set to true when Raft is the HA storage type")
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
if !disableClustering {
|
if !disableClustering {
|
||||||
coreConfig.ClusterAddr = config.HAStorage.ClusterAddr
|
coreConfig.ClusterAddr = config.HAStorage.ClusterAddr
|
||||||
}
|
}
|
||||||
@@ -1223,7 +1236,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||||||
coreConfig.RedirectAddr = config.Storage.RedirectAddr
|
coreConfig.RedirectAddr = config.Storage.RedirectAddr
|
||||||
disableClustering = config.Storage.DisableClustering
|
disableClustering = config.Storage.DisableClustering
|
||||||
|
|
||||||
if config.Storage.Type == "raft" && disableClustering {
|
if (config.Storage.Type == storageTypeRaft) && disableClustering {
|
||||||
c.UI.Error("Disable clustering cannot be set to true when Raft is the storage type")
|
c.UI.Error("Disable clustering cannot be set to true when Raft is the storage type")
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
@@ -1559,7 +1572,8 @@ CLUSTER_SYNTHESIS_COMPLETE:
|
|||||||
|
|
||||||
// When the underlying storage is raft, kick off retry join if it was specified
|
// When the underlying storage is raft, kick off retry join if it was specified
|
||||||
// in the configuration
|
// in the configuration
|
||||||
if config.Storage.Type == "raft" {
|
// TODO: Should we also support retry_join for ha_storage?
|
||||||
|
if config.Storage.Type == storageTypeRaft {
|
||||||
if err := core.InitiateRetryJoin(context.Background()); err != nil {
|
if err := core.InitiateRetryJoin(context.Background()); err != nil {
|
||||||
c.UI.Error(fmt.Sprintf("Failed to initiate raft retry join, %q", err.Error()))
|
c.UI.Error(fmt.Sprintf("Failed to initiate raft retry join, %q", err.Error()))
|
||||||
return 1
|
return 1
|
||||||
|
|||||||
@@ -415,7 +415,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
|
|||||||
|
|
||||||
addressProvider := &TestRaftServerAddressProvider{Cluster: cluster}
|
addressProvider := &TestRaftServerAddressProvider{Cluster: cluster}
|
||||||
|
|
||||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||||
|
|
||||||
leader := cluster.Cores[0]
|
leader := cluster.Cores[0]
|
||||||
|
|
||||||
|
|||||||
@@ -130,9 +130,56 @@ func MakeRaftBackend(t testing.T, coreIdx int, logger hclog.Logger) *vault.Physi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClusterSetupMutator func(conf *vault.CoreConfig, opts *vault.TestClusterOptions)
|
// RaftHAFactory returns a PhysicalBackendBundle with raft set as the HABackend
|
||||||
|
// and the physical.Backend provided in PhysicalBackendBundler as the storage
|
||||||
|
// backend.
|
||||||
|
func RaftHAFactory(f PhysicalBackendBundler) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||||
|
return func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||||
|
// Call the factory func to create the storage backend
|
||||||
|
physFactory := SharedPhysicalFactory(f)
|
||||||
|
bundle := physFactory(t, coreIdx, logger)
|
||||||
|
|
||||||
func SharedPhysicalFactory(f func(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
// This can happen if a shared physical backend is called on a non-0th core.
|
||||||
|
if bundle == nil {
|
||||||
|
bundle = new(vault.PhysicalBackendBundle)
|
||||||
|
}
|
||||||
|
|
||||||
|
raftDir := makeRaftDir(t)
|
||||||
|
cleanupFunc := func() {
|
||||||
|
os.RemoveAll(raftDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
||||||
|
conf := map[string]string{
|
||||||
|
"path": raftDir,
|
||||||
|
"node_id": nodeID,
|
||||||
|
"performance_multiplier": "8",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create and set the HA Backend
|
||||||
|
raftBackend, err := raft.NewRaftBackend(conf, logger)
|
||||||
|
if err != nil {
|
||||||
|
bundle.Cleanup()
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
bundle.HABackend = raftBackend.(physical.HABackend)
|
||||||
|
|
||||||
|
// Re-wrap the cleanup func
|
||||||
|
bundleCleanup := bundle.Cleanup
|
||||||
|
bundle.Cleanup = func() {
|
||||||
|
if bundleCleanup != nil {
|
||||||
|
bundleCleanup()
|
||||||
|
}
|
||||||
|
cleanupFunc()
|
||||||
|
}
|
||||||
|
|
||||||
|
return bundle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type PhysicalBackendBundler func(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle
|
||||||
|
|
||||||
|
func SharedPhysicalFactory(f PhysicalBackendBundler) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||||
return func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
return func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||||
if coreIdx == 0 {
|
if coreIdx == 0 {
|
||||||
return f(t, logger)
|
return f(t, logger)
|
||||||
@@ -141,6 +188,8 @@ func SharedPhysicalFactory(f func(t testing.T, logger hclog.Logger) *vault.Physi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ClusterSetupMutator func(conf *vault.CoreConfig, opts *vault.TestClusterOptions)
|
||||||
|
|
||||||
func InmemBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
func InmemBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
||||||
opts.PhysicalFactory = SharedPhysicalFactory(MakeInmemBackend)
|
opts.PhysicalFactory = SharedPhysicalFactory(MakeInmemBackend)
|
||||||
}
|
}
|
||||||
@@ -166,6 +215,11 @@ func RaftBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RaftHASetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, bundler PhysicalBackendBundler) {
|
||||||
|
opts.KeepStandbysSealed = true
|
||||||
|
opts.PhysicalFactory = RaftHAFactory(bundler)
|
||||||
|
}
|
||||||
|
|
||||||
func ClusterSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, setup ClusterSetupMutator) (*vault.CoreConfig, *vault.TestClusterOptions) {
|
func ClusterSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, setup ClusterSetupMutator) (*vault.CoreConfig, *vault.TestClusterOptions) {
|
||||||
var localConf vault.CoreConfig
|
var localConf vault.CoreConfig
|
||||||
if conf != nil {
|
if conf != nil {
|
||||||
|
|||||||
@@ -5,12 +5,12 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/mitchellh/go-testing-interface"
|
|
||||||
|
|
||||||
hclog "github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
raftlib "github.com/hashicorp/raft"
|
raftlib "github.com/hashicorp/raft"
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
"github.com/hashicorp/vault/physical/raft"
|
||||||
|
"github.com/hashicorp/vault/sdk/physical"
|
||||||
"github.com/hashicorp/vault/vault"
|
"github.com/hashicorp/vault/vault"
|
||||||
|
"github.com/mitchellh/go-testing-interface"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReusableStorage is a physical backend that can be re-used across
|
// ReusableStorage is a physical backend that can be re-used across
|
||||||
@@ -59,8 +59,7 @@ func MakeReusableStorage(t testing.T, logger hclog.Logger, bundle *vault.Physica
|
|||||||
},
|
},
|
||||||
|
|
||||||
// No-op
|
// No-op
|
||||||
Cleanup: func(t testing.T, cluster *vault.TestCluster) {
|
Cleanup: func(t testing.T, cluster *vault.TestCluster) {},
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup := func() {
|
cleanup := func() {
|
||||||
@@ -74,10 +73,7 @@ func MakeReusableStorage(t testing.T, logger hclog.Logger, bundle *vault.Physica
|
|||||||
|
|
||||||
// MakeReusableRaftStorage makes a physical raft backend that can be re-used
|
// MakeReusableRaftStorage makes a physical raft backend that can be re-used
|
||||||
// across multiple test clusters in sequence.
|
// across multiple test clusters in sequence.
|
||||||
func MakeReusableRaftStorage(
|
func MakeReusableRaftStorage(t testing.T, logger hclog.Logger, numCores int, addressProvider raftlib.ServerAddressProvider) (ReusableStorage, StorageCleanup) {
|
||||||
t testing.T, logger hclog.Logger, numCores int,
|
|
||||||
addressProvider raftlib.ServerAddressProvider,
|
|
||||||
) (ReusableStorage, StorageCleanup) {
|
|
||||||
|
|
||||||
raftDirs := make([]string, numCores)
|
raftDirs := make([]string, numCores)
|
||||||
for i := 0; i < numCores; i++ {
|
for i := 0; i < numCores; i++ {
|
||||||
@@ -91,7 +87,7 @@ func MakeReusableRaftStorage(
|
|||||||
conf.DisablePerformanceStandby = true
|
conf.DisablePerformanceStandby = true
|
||||||
opts.KeepStandbysSealed = true
|
opts.KeepStandbysSealed = true
|
||||||
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||||
return makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], addressProvider)
|
return makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], addressProvider, false)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
@@ -120,6 +116,49 @@ func CloseRaftStorage(t testing.T, cluster *vault.TestCluster, idx int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func MakeReusableRaftHAStorage(t testing.T, logger hclog.Logger, numCores int, bundle *vault.PhysicalBackendBundle) (ReusableStorage, StorageCleanup) {
|
||||||
|
raftDirs := make([]string, numCores)
|
||||||
|
for i := 0; i < numCores; i++ {
|
||||||
|
raftDirs[i] = makeRaftDir(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
storage := ReusableStorage{
|
||||||
|
Setup: func(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
||||||
|
opts.KeepStandbysSealed = true
|
||||||
|
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||||
|
haBundle := makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], nil, true)
|
||||||
|
|
||||||
|
return &vault.PhysicalBackendBundle{
|
||||||
|
Backend: bundle.Backend,
|
||||||
|
HABackend: haBundle.HABackend,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// Close open files being used by raft.
|
||||||
|
Cleanup: func(t testing.T, cluster *vault.TestCluster) {
|
||||||
|
for _, core := range cluster.Cores {
|
||||||
|
raftStorage := core.UnderlyingHAStorage.(*raft.RaftBackend)
|
||||||
|
if err := raftStorage.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup := func() {
|
||||||
|
if bundle.Cleanup != nil {
|
||||||
|
bundle.Cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, rd := range raftDirs {
|
||||||
|
os.RemoveAll(rd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return storage, cleanup
|
||||||
|
}
|
||||||
|
|
||||||
func makeRaftDir(t testing.T) string {
|
func makeRaftDir(t testing.T) string {
|
||||||
raftDir, err := ioutil.TempDir("", "vault-raft-")
|
raftDir, err := ioutil.TempDir("", "vault-raft-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -129,10 +168,7 @@ func makeRaftDir(t testing.T) string {
|
|||||||
return raftDir
|
return raftDir
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeReusableRaftBackend(
|
func makeReusableRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, raftDir string, addressProvider raftlib.ServerAddressProvider, ha bool) *vault.PhysicalBackendBundle {
|
||||||
t testing.T, coreIdx int, logger hclog.Logger, raftDir string,
|
|
||||||
addressProvider raftlib.ServerAddressProvider,
|
|
||||||
) *vault.PhysicalBackendBundle {
|
|
||||||
|
|
||||||
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
||||||
conf := map[string]string{
|
conf := map[string]string{
|
||||||
@@ -146,9 +182,16 @@ func makeReusableRaftBackend(
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
backend.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
if addressProvider != nil {
|
||||||
|
backend.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||||
return &vault.PhysicalBackendBundle{
|
|
||||||
Backend: backend,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bundle := new(vault.PhysicalBackendBundle)
|
||||||
|
|
||||||
|
if ha {
|
||||||
|
bundle.HABackend = backend.(physical.HABackend)
|
||||||
|
} else {
|
||||||
|
bundle.Backend = backend
|
||||||
|
}
|
||||||
|
return bundle
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -144,6 +144,7 @@ func Handler(props *vault.HandlerProperties) http.Handler {
|
|||||||
mux.Handle("/v1/sys/rekey-recovery-key/init", handleRequestForwarding(core, handleSysRekeyInit(core, true)))
|
mux.Handle("/v1/sys/rekey-recovery-key/init", handleRequestForwarding(core, handleSysRekeyInit(core, true)))
|
||||||
mux.Handle("/v1/sys/rekey-recovery-key/update", handleRequestForwarding(core, handleSysRekeyUpdate(core, true)))
|
mux.Handle("/v1/sys/rekey-recovery-key/update", handleRequestForwarding(core, handleSysRekeyUpdate(core, true)))
|
||||||
mux.Handle("/v1/sys/rekey-recovery-key/verify", handleRequestForwarding(core, handleSysRekeyVerify(core, true)))
|
mux.Handle("/v1/sys/rekey-recovery-key/verify", handleRequestForwarding(core, handleSysRekeyVerify(core, true)))
|
||||||
|
mux.Handle("/v1/sys/storage/raft/bootstrap", handleSysRaftBootstrap(core))
|
||||||
mux.Handle("/v1/sys/storage/raft/join", handleSysRaftJoin(core))
|
mux.Handle("/v1/sys/storage/raft/join", handleSysRaftJoin(core))
|
||||||
for _, path := range injectDataIntoTopRoutes {
|
for _, path := range injectDataIntoTopRoutes {
|
||||||
mux.Handle(path, handleRequestForwarding(core, handleLogicalWithInjector(core)))
|
mux.Handle(path, handleRequestForwarding(core, handleLogicalWithInjector(core)))
|
||||||
|
|||||||
@@ -12,6 +12,25 @@ import (
|
|||||||
"github.com/hashicorp/vault/vault"
|
"github.com/hashicorp/vault/vault"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func handleSysRaftBootstrap(core *vault.Core) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.Method {
|
||||||
|
case "POST", "PUT":
|
||||||
|
if core.Sealed() {
|
||||||
|
respondError(w, http.StatusBadRequest, errors.New("node must be unsealed to bootstrap"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := core.RaftBootstrap(context.Background(), false); err != nil {
|
||||||
|
respondError(w, http.StatusInternalServerError, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
respondError(w, http.StatusBadRequest, nil)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func handleSysRaftJoin(core *vault.Core) http.Handler {
|
func handleSysRaftJoin(core *vault.Core) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
@@ -53,6 +72,7 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ
|
|||||||
Retry: req.Retry,
|
Retry: req.Retry,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
joined, err := core.JoinRaftCluster(context.Background(), leaderInfos, req.NonVoter)
|
joined, err := core.JoinRaftCluster(context.Background(), leaderInfos, req.NonVoter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
respondError(w, http.StatusInternalServerError, err)
|
respondError(w, http.StatusInternalServerError, err)
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ const EnvVaultRaftPath = "VAULT_RAFT_PATH"
|
|||||||
// Verify RaftBackend satisfies the correct interfaces
|
// Verify RaftBackend satisfies the correct interfaces
|
||||||
var _ physical.Backend = (*RaftBackend)(nil)
|
var _ physical.Backend = (*RaftBackend)(nil)
|
||||||
var _ physical.Transactional = (*RaftBackend)(nil)
|
var _ physical.Transactional = (*RaftBackend)(nil)
|
||||||
|
var _ physical.HABackend = (*RaftBackend)(nil)
|
||||||
|
var _ physical.Lock = (*RaftLock)(nil)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
||||||
@@ -68,6 +70,11 @@ type RaftBackend struct {
|
|||||||
// raft is the instance of raft we will operate on.
|
// raft is the instance of raft we will operate on.
|
||||||
raft *raft.Raft
|
raft *raft.Raft
|
||||||
|
|
||||||
|
// raftInitCh is used to block during HA lock acquisition if raft
|
||||||
|
// has not been initialized yet, which can occur if raft is being
|
||||||
|
// used for HA-only.
|
||||||
|
raftInitCh chan struct{}
|
||||||
|
|
||||||
// raftNotifyCh is used to receive updates about leadership changes
|
// raftNotifyCh is used to receive updates about leadership changes
|
||||||
// regarding this node.
|
// regarding this node.
|
||||||
raftNotifyCh chan bool
|
raftNotifyCh chan bool
|
||||||
@@ -323,6 +330,7 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
|
|||||||
return &RaftBackend{
|
return &RaftBackend{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
fsm: fsm,
|
fsm: fsm,
|
||||||
|
raftInitCh: make(chan struct{}),
|
||||||
conf: conf,
|
conf: conf,
|
||||||
logStore: log,
|
logStore: log,
|
||||||
stableStore: stable,
|
stableStore: stable,
|
||||||
@@ -420,7 +428,7 @@ func (b *RaftBackend) SetServerAddressProvider(provider raft.ServerAddressProvid
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Bootstrap prepares the given peers to be part of the raft cluster
|
// Bootstrap prepares the given peers to be part of the raft cluster
|
||||||
func (b *RaftBackend) Bootstrap(ctx context.Context, peers []Peer) error {
|
func (b *RaftBackend) Bootstrap(peers []Peer) error {
|
||||||
b.l.Lock()
|
b.l.Lock()
|
||||||
defer b.l.Unlock()
|
defer b.l.Unlock()
|
||||||
|
|
||||||
@@ -532,6 +540,13 @@ func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer) error
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *RaftBackend) HasState() (bool, error) {
|
||||||
|
b.l.RLock()
|
||||||
|
defer b.l.RUnlock()
|
||||||
|
|
||||||
|
return raft.HasExistingState(b.logStore, b.stableStore, b.snapStore)
|
||||||
|
}
|
||||||
|
|
||||||
// SetupCluster starts the raft cluster and enables the networking needed for
|
// SetupCluster starts the raft cluster and enables the networking needed for
|
||||||
// the raft nodes to communicate.
|
// the raft nodes to communicate.
|
||||||
func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
||||||
@@ -699,6 +714,10 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
|||||||
opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer)
|
opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close the init channel to signal setup has been completed
|
||||||
|
close(b.raftInitCh)
|
||||||
|
|
||||||
|
b.logger.Trace("finished setting up raft cluster")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -712,6 +731,9 @@ func (b *RaftBackend) TeardownCluster(clusterListener cluster.ClusterHook) error
|
|||||||
b.l.Lock()
|
b.l.Lock()
|
||||||
future := b.raft.Shutdown()
|
future := b.raft.Shutdown()
|
||||||
b.raft = nil
|
b.raft = nil
|
||||||
|
|
||||||
|
// If we're tearing down, then we need to recreate the raftInitCh
|
||||||
|
b.raftInitCh = make(chan struct{})
|
||||||
b.l.Unlock()
|
b.l.Unlock()
|
||||||
|
|
||||||
return future.Error()
|
return future.Error()
|
||||||
@@ -1111,10 +1133,10 @@ func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HAEnabled is the implemention of the HABackend interface
|
// HAEnabled is the implementation of the HABackend interface
|
||||||
func (b *RaftBackend) HAEnabled() bool { return true }
|
func (b *RaftBackend) HAEnabled() bool { return true }
|
||||||
|
|
||||||
// HAEnabled is the implemention of the HABackend interface
|
// HAEnabled is the implementation of the HABackend interface
|
||||||
func (b *RaftBackend) LockWith(key, value string) (physical.Lock, error) {
|
func (b *RaftBackend) LockWith(key, value string) (physical.Lock, error) {
|
||||||
return &RaftLock{
|
return &RaftLock{
|
||||||
key: key,
|
key: key,
|
||||||
@@ -1161,18 +1183,27 @@ func (l *RaftLock) monitorLeadership(stopCh <-chan struct{}, leaderNotifyCh <-ch
|
|||||||
// Lock blocks until we become leader or are shutdown. It returns a channel that
|
// Lock blocks until we become leader or are shutdown. It returns a channel that
|
||||||
// is closed when we detect a loss of leadership.
|
// is closed when we detect a loss of leadership.
|
||||||
func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
||||||
|
// If not initialized, block until it is
|
||||||
|
if !l.b.Initialized() {
|
||||||
|
select {
|
||||||
|
case <-l.b.raftInitCh:
|
||||||
|
case <-stopCh:
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
l.b.l.RLock()
|
l.b.l.RLock()
|
||||||
|
|
||||||
|
// Ensure that we still have a raft instance after grabbing the read lock
|
||||||
|
if l.b.raft == nil {
|
||||||
|
l.b.l.RUnlock()
|
||||||
|
return nil, errors.New("attempted to grab a lock on a nil raft backend")
|
||||||
|
}
|
||||||
|
|
||||||
// Cache the notifyCh locally
|
// Cache the notifyCh locally
|
||||||
leaderNotifyCh := l.b.raftNotifyCh
|
leaderNotifyCh := l.b.raftNotifyCh
|
||||||
|
|
||||||
// TODO: Remove when Raft can server as the ha_storage backend. The internal
|
|
||||||
// raft pointer should not be nil here, but the nil check is a guard against
|
|
||||||
// https://github.com/hashicorp/vault/issues/8206
|
|
||||||
if l.b.raft == nil {
|
|
||||||
return nil, errors.New("attempted to grab a lock on a nil raft backend")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check to see if we are already leader.
|
// Check to see if we are already leader.
|
||||||
if l.b.raft.State() == raft.Leader {
|
if l.b.raft.State() == raft.Leader {
|
||||||
err := l.b.applyLog(context.Background(), &LogData{
|
err := l.b.applyLog(context.Background(), &LogData{
|
||||||
@@ -1225,6 +1256,10 @@ func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
|||||||
|
|
||||||
// Unlock gives up leadership.
|
// Unlock gives up leadership.
|
||||||
func (l *RaftLock) Unlock() error {
|
func (l *RaftLock) Unlock() error {
|
||||||
|
if l.b.raft == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
return l.b.raft.LeadershipTransfer().Error()
|
return l.b.raft.LeadershipTransfer().Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -36,8 +36,13 @@ func getRaft(t testing.TB, bootstrap bool, noStoreState bool) (*RaftBackend, str
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir string) (*RaftBackend, string) {
|
func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir string) (*RaftBackend, string) {
|
||||||
|
id, err := uuid.GenerateUUID()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
logger := hclog.New(&hclog.LoggerOptions{
|
logger := hclog.New(&hclog.LoggerOptions{
|
||||||
Name: "raft",
|
Name: fmt.Sprintf("raft-%s", id),
|
||||||
Level: hclog.Trace,
|
Level: hclog.Trace,
|
||||||
})
|
})
|
||||||
logger.Info("raft dir", "dir", raftDir)
|
logger.Info("raft dir", "dir", raftDir)
|
||||||
@@ -45,6 +50,7 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str
|
|||||||
conf := map[string]string{
|
conf := map[string]string{
|
||||||
"path": raftDir,
|
"path": raftDir,
|
||||||
"trailing_logs": "100",
|
"trailing_logs": "100",
|
||||||
|
"node_id": id,
|
||||||
}
|
}
|
||||||
|
|
||||||
if noStoreState {
|
if noStoreState {
|
||||||
@@ -58,7 +64,12 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str
|
|||||||
backend := backendRaw.(*RaftBackend)
|
backend := backendRaw.(*RaftBackend)
|
||||||
|
|
||||||
if bootstrap {
|
if bootstrap {
|
||||||
err = backend.Bootstrap(context.Background(), []Peer{Peer{ID: backend.NodeID(), Address: backend.NodeID()}})
|
err = backend.Bootstrap([]Peer{
|
||||||
|
{
|
||||||
|
ID: backend.NodeID(),
|
||||||
|
Address: backend.NodeID(),
|
||||||
|
},
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ func addPeer(t *testing.T, leader, follower *RaftBackend) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = follower.Bootstrap(context.Background(), peers)
|
err = follower.Bootstrap(peers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -316,7 +316,7 @@ func (l *raftLayer) CALookup(context.Context) ([]*x509.Certificate, error) {
|
|||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop shutsdown the raft layer.
|
// Stop shuts down the raft layer.
|
||||||
func (l *raftLayer) Stop() error {
|
func (l *raftLayer) Stop() error {
|
||||||
l.Close()
|
l.Close()
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -314,4 +314,3 @@ func writeFile(t *testing.T, filename string, data []byte, perms os.FileMode) {
|
|||||||
t.Fatalf("Unable to write to file [%s]: %s", filename, err)
|
t.Fatalf("Unable to write to file [%s]: %s", filename, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,7 +31,6 @@ import (
|
|||||||
"github.com/hashicorp/vault/helper/metricsutil"
|
"github.com/hashicorp/vault/helper/metricsutil"
|
||||||
"github.com/hashicorp/vault/helper/namespace"
|
"github.com/hashicorp/vault/helper/namespace"
|
||||||
"github.com/hashicorp/vault/internalshared/reloadutil"
|
"github.com/hashicorp/vault/internalshared/reloadutil"
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
|
||||||
"github.com/hashicorp/vault/sdk/helper/certutil"
|
"github.com/hashicorp/vault/sdk/helper/certutil"
|
||||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||||
@@ -1455,7 +1454,7 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.startRaftStorage(ctx); err != nil {
|
if err := c.startRaftBackend(ctx); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1714,7 +1713,7 @@ func (c *Core) sealInternal() error {
|
|||||||
return c.sealInternalWithOptions(true, false, true)
|
return c.sealInternalWithOptions(true, false, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft bool) error {
|
func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, performCleanup bool) error {
|
||||||
// Mark sealed, and if already marked return
|
// Mark sealed, and if already marked return
|
||||||
if swapped := atomic.CompareAndSwapUint32(c.sealed, 0, 1); !swapped {
|
if swapped := atomic.CompareAndSwapUint32(c.sealed, 0, 1); !swapped {
|
||||||
return nil
|
return nil
|
||||||
@@ -1796,10 +1795,10 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft b
|
|||||||
|
|
||||||
c.teardownReplicationResolverHandler()
|
c.teardownReplicationResolverHandler()
|
||||||
|
|
||||||
// If the storage backend needs to be sealed
|
// Perform additional cleanup upon sealing.
|
||||||
if shutdownRaft {
|
if performCleanup {
|
||||||
if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
|
if raftBackend := c.getRaftBackend(); raftBackend != nil {
|
||||||
if err := raftStorage.TeardownCluster(c.getClusterListener()); err != nil {
|
if err := raftBackend.TeardownCluster(c.getClusterListener()); err != nil {
|
||||||
c.logger.Error("error stopping storage cluster", "error", err)
|
c.logger.Error("error stopping storage cluster", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
242
vault/external_tests/raft/raft_ha_test.go
Normal file
242
vault/external_tests/raft/raft_ha_test.go
Normal file
@@ -0,0 +1,242 @@
|
|||||||
|
package rafttests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/vault/api"
|
||||||
|
"github.com/hashicorp/vault/helper/testhelpers"
|
||||||
|
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
|
||||||
|
vaulthttp "github.com/hashicorp/vault/http"
|
||||||
|
"github.com/hashicorp/vault/physical/raft"
|
||||||
|
"github.com/hashicorp/vault/sdk/helper/logging"
|
||||||
|
"github.com/hashicorp/vault/vault"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRaft_HA_NewCluster(t *testing.T) {
|
||||||
|
t.Run("file", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("no_client_certs", func(t *testing.T) {
|
||||||
|
testRaftHANewCluster(t, teststorage.MakeFileBackend, false)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with_client_certs", func(t *testing.T) {
|
||||||
|
testRaftHANewCluster(t, teststorage.MakeFileBackend, true)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("inmem", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("no_client_certs", func(t *testing.T) {
|
||||||
|
testRaftHANewCluster(t, teststorage.MakeInmemBackend, false)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with_client_certs", func(t *testing.T) {
|
||||||
|
testRaftHANewCluster(t, teststorage.MakeInmemBackend, true)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("consul", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("no_client_certs", func(t *testing.T) {
|
||||||
|
testRaftHANewCluster(t, teststorage.MakeConsulBackend, false)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with_client_certs", func(t *testing.T) {
|
||||||
|
testRaftHANewCluster(t, teststorage.MakeConsulBackend, true)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRaftHANewCluster(t *testing.T, bundler teststorage.PhysicalBackendBundler, addClientCerts bool) {
|
||||||
|
var conf vault.CoreConfig
|
||||||
|
var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler}
|
||||||
|
|
||||||
|
teststorage.RaftHASetup(&conf, &opts, bundler)
|
||||||
|
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||||
|
cluster.Start()
|
||||||
|
defer cluster.Cleanup()
|
||||||
|
|
||||||
|
addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster}
|
||||||
|
|
||||||
|
leaderCore := cluster.Cores[0]
|
||||||
|
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||||
|
|
||||||
|
// Seal the leader so we can install an address provider
|
||||||
|
{
|
||||||
|
testhelpers.EnsureCoreSealed(t, leaderCore)
|
||||||
|
leaderCore.UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||||
|
cluster.UnsealCore(t, leaderCore)
|
||||||
|
vault.TestWaitActive(t, leaderCore.Core)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now unseal core for join commands to work
|
||||||
|
testhelpers.EnsureCoresUnsealed(t, cluster)
|
||||||
|
|
||||||
|
joinFunc := func(client *api.Client, addClientCerts bool) {
|
||||||
|
req := &api.RaftJoinRequest{
|
||||||
|
LeaderCACert: string(cluster.CACertPEM),
|
||||||
|
}
|
||||||
|
if addClientCerts {
|
||||||
|
req.LeaderClientCert = string(cluster.CACertPEM)
|
||||||
|
req.LeaderClientKey = string(cluster.CAKeyPEM)
|
||||||
|
}
|
||||||
|
resp, err := client.Sys().RaftJoin(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !resp.Joined {
|
||||||
|
t.Fatalf("failed to join raft cluster")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
joinFunc(cluster.Cores[1].Client, addClientCerts)
|
||||||
|
joinFunc(cluster.Cores[2].Client, addClientCerts)
|
||||||
|
|
||||||
|
// Ensure peers are added
|
||||||
|
leaderClient := cluster.Cores[0].Client
|
||||||
|
verifyRaftPeers(t, leaderClient, map[string]bool{
|
||||||
|
"core-0": true,
|
||||||
|
"core-1": true,
|
||||||
|
"core-2": true,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test remove peers
|
||||||
|
_, err := leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{
|
||||||
|
"server_id": "core-1",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{
|
||||||
|
"server_id": "core-2",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure peers are removed
|
||||||
|
verifyRaftPeers(t, leaderClient, map[string]bool{
|
||||||
|
"core-0": true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRaft_HA_ExistingCluster(t *testing.T) {
|
||||||
|
conf := vault.CoreConfig{
|
||||||
|
DisablePerformanceStandby: true,
|
||||||
|
}
|
||||||
|
opts := vault.TestClusterOptions{
|
||||||
|
HandlerFunc: vaulthttp.Handler,
|
||||||
|
NumCores: vault.DefaultNumCores,
|
||||||
|
KeepStandbysSealed: true,
|
||||||
|
}
|
||||||
|
logger := logging.NewVaultLogger(hclog.Debug).Named(t.Name())
|
||||||
|
|
||||||
|
physBundle := teststorage.MakeInmemBackend(t, logger)
|
||||||
|
physBundle.HABackend = nil
|
||||||
|
|
||||||
|
storage, cleanup := teststorage.MakeReusableStorage(t, logger, physBundle)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
var (
|
||||||
|
clusterBarrierKeys [][]byte
|
||||||
|
clusterRootToken string
|
||||||
|
)
|
||||||
|
createCluster := func(t *testing.T) {
|
||||||
|
t.Log("simulating cluster creation without raft as HABackend")
|
||||||
|
|
||||||
|
storage.Setup(&conf, &opts)
|
||||||
|
|
||||||
|
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||||
|
cluster.Start()
|
||||||
|
defer func() {
|
||||||
|
cluster.Cleanup()
|
||||||
|
storage.Cleanup(t, cluster)
|
||||||
|
}()
|
||||||
|
|
||||||
|
clusterBarrierKeys = cluster.BarrierKeys
|
||||||
|
clusterRootToken = cluster.RootToken
|
||||||
|
}
|
||||||
|
|
||||||
|
createCluster(t)
|
||||||
|
|
||||||
|
haStorage, haCleanup := teststorage.MakeReusableRaftHAStorage(t, logger, opts.NumCores, physBundle)
|
||||||
|
defer haCleanup()
|
||||||
|
|
||||||
|
updateCLuster := func(t *testing.T) {
|
||||||
|
t.Log("simulating cluster update with raft as HABackend")
|
||||||
|
|
||||||
|
opts.SkipInit = true
|
||||||
|
haStorage.Setup(&conf, &opts)
|
||||||
|
|
||||||
|
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||||
|
cluster.Start()
|
||||||
|
defer func() {
|
||||||
|
cluster.Cleanup()
|
||||||
|
haStorage.Cleanup(t, cluster)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Set cluster values
|
||||||
|
cluster.BarrierKeys = clusterBarrierKeys
|
||||||
|
cluster.RootToken = clusterRootToken
|
||||||
|
|
||||||
|
addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster}
|
||||||
|
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||||
|
|
||||||
|
// Seal the leader so we can install an address provider
|
||||||
|
leaderCore := cluster.Cores[0]
|
||||||
|
{
|
||||||
|
testhelpers.EnsureCoreSealed(t, leaderCore)
|
||||||
|
leaderCore.UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||||
|
testhelpers.EnsureCoreUnsealed(t, cluster, leaderCore)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the bootstrap on the leader and then ensure that it becomes active
|
||||||
|
leaderClient := cluster.Cores[0].Client
|
||||||
|
leaderClient.SetToken(clusterRootToken)
|
||||||
|
{
|
||||||
|
_, err := leaderClient.Logical().Write("sys/storage/raft/bootstrap", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
vault.TestWaitActive(t, leaderCore.Core)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set address provider
|
||||||
|
cluster.Cores[1].UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||||
|
cluster.Cores[2].UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||||
|
|
||||||
|
// Now unseal core for join commands to work
|
||||||
|
testhelpers.EnsureCoresUnsealed(t, cluster)
|
||||||
|
|
||||||
|
joinFunc := func(client *api.Client) {
|
||||||
|
req := &api.RaftJoinRequest{
|
||||||
|
LeaderCACert: string(cluster.CACertPEM),
|
||||||
|
}
|
||||||
|
resp, err := client.Sys().RaftJoin(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !resp.Joined {
|
||||||
|
t.Fatalf("failed to join raft cluster")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
joinFunc(cluster.Cores[1].Client)
|
||||||
|
joinFunc(cluster.Cores[2].Client)
|
||||||
|
|
||||||
|
// Ensure peers are added
|
||||||
|
verifyRaftPeers(t, leaderClient, map[string]bool{
|
||||||
|
"core-0": true,
|
||||||
|
"core-1": true,
|
||||||
|
"core-2": true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
updateCLuster(t)
|
||||||
|
}
|
||||||
@@ -47,7 +47,7 @@ func TestRaft_Retry_Join(t *testing.T) {
|
|||||||
|
|
||||||
leaderCore := cluster.Cores[0]
|
leaderCore := cluster.Cores[0]
|
||||||
leaderAPI := leaderCore.Client.Address()
|
leaderAPI := leaderCore.Client.Address()
|
||||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||||
|
|
||||||
{
|
{
|
||||||
testhelpers.EnsureCoreSealed(t, leaderCore)
|
testhelpers.EnsureCoreSealed(t, leaderCore)
|
||||||
@@ -90,23 +90,7 @@ func TestRaft_Retry_Join(t *testing.T) {
|
|||||||
cluster.UnsealCore(t, core)
|
cluster.UnsealCore(t, core)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkConfigFunc := func(expected map[string]bool) {
|
verifyRaftPeers(t, cluster.Cores[0].Client, map[string]bool{
|
||||||
secret, err := cluster.Cores[0].Client.Logical().Read("sys/storage/raft/configuration")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
servers := secret.Data["config"].(map[string]interface{})["servers"].([]interface{})
|
|
||||||
|
|
||||||
for _, s := range servers {
|
|
||||||
server := s.(map[string]interface{})
|
|
||||||
delete(expected, server["node_id"].(string))
|
|
||||||
}
|
|
||||||
if len(expected) != 0 {
|
|
||||||
t.Fatalf("failed to read configuration successfully")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
checkConfigFunc(map[string]bool{
|
|
||||||
"core-0": true,
|
"core-0": true,
|
||||||
"core-1": true,
|
"core-1": true,
|
||||||
"core-2": true,
|
"core-2": true,
|
||||||
@@ -126,7 +110,7 @@ func TestRaft_Join(t *testing.T) {
|
|||||||
|
|
||||||
leaderCore := cluster.Cores[0]
|
leaderCore := cluster.Cores[0]
|
||||||
leaderAPI := leaderCore.Client.Address()
|
leaderAPI := leaderCore.Client.Address()
|
||||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||||
|
|
||||||
// Seal the leader so we can install an address provider
|
// Seal the leader so we can install an address provider
|
||||||
{
|
{
|
||||||
@@ -187,23 +171,7 @@ func TestRaft_RemovePeer(t *testing.T) {
|
|||||||
|
|
||||||
client := cluster.Cores[0].Client
|
client := cluster.Cores[0].Client
|
||||||
|
|
||||||
checkConfigFunc := func(expected map[string]bool) {
|
verifyRaftPeers(t, client, map[string]bool{
|
||||||
secret, err := client.Logical().Read("sys/storage/raft/configuration")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
servers := secret.Data["config"].(map[string]interface{})["servers"].([]interface{})
|
|
||||||
|
|
||||||
for _, s := range servers {
|
|
||||||
server := s.(map[string]interface{})
|
|
||||||
delete(expected, server["node_id"].(string))
|
|
||||||
}
|
|
||||||
if len(expected) != 0 {
|
|
||||||
t.Fatalf("failed to read configuration successfully")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
checkConfigFunc(map[string]bool{
|
|
||||||
"core-0": true,
|
"core-0": true,
|
||||||
"core-1": true,
|
"core-1": true,
|
||||||
"core-2": true,
|
"core-2": true,
|
||||||
@@ -216,7 +184,7 @@ func TestRaft_RemovePeer(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkConfigFunc(map[string]bool{
|
verifyRaftPeers(t, client, map[string]bool{
|
||||||
"core-0": true,
|
"core-0": true,
|
||||||
"core-1": true,
|
"core-1": true,
|
||||||
})
|
})
|
||||||
@@ -228,7 +196,7 @@ func TestRaft_RemovePeer(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkConfigFunc(map[string]bool{
|
verifyRaftPeers(t, client, map[string]bool{
|
||||||
"core-0": true,
|
"core-0": true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -843,3 +811,39 @@ func BenchmarkRaft_SingleNode(b *testing.B) {
|
|||||||
|
|
||||||
b.Run("256b", func(b *testing.B) { bench(b, 25) })
|
b.Run("256b", func(b *testing.B) { bench(b, 25) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func verifyRaftPeers(t *testing.T, client *api.Client, expected map[string]bool) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
resp, err := client.Logical().Read("sys/storage/raft/configuration")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error reading raft config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp == nil || resp.Data == nil {
|
||||||
|
t.Fatal("missing response data")
|
||||||
|
}
|
||||||
|
|
||||||
|
config, ok := resp.Data["config"].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("missing config in response data")
|
||||||
|
}
|
||||||
|
|
||||||
|
servers, ok := config["servers"].([]interface{})
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("missing servers in response data config")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate through the servers and remove the node found in the response
|
||||||
|
// from the expected collection
|
||||||
|
for _, s := range servers {
|
||||||
|
server := s.(map[string]interface{})
|
||||||
|
delete(expected, server["node_id"].(string))
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the collection is non-empty, it means that the peer was not found in
|
||||||
|
// the response.
|
||||||
|
if len(expected) != 0 {
|
||||||
|
t.Fatalf("failed to read configuration successfully, expected peers no found in configuration list: %v", expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ func testVariousBackends(t *testing.T, tf testFunc, basePort int, includeRaft bo
|
|||||||
logger := logger.Named("raft")
|
logger := logger.Named("raft")
|
||||||
raftBasePort := basePort + 400
|
raftBasePort := basePort + 400
|
||||||
|
|
||||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||||
addressProvider := testhelpers.NewHardcodedServerAddressProvider(numTestCores, raftBasePort+10)
|
addressProvider := testhelpers.NewHardcodedServerAddressProvider(numTestCores, raftBasePort+10)
|
||||||
|
|
||||||
storage, cleanup := teststorage.MakeReusableRaftStorage(t, logger, numTestCores, addressProvider)
|
storage, cleanup := teststorage.MakeReusableRaftStorage(t, logger, numTestCores, addressProvider)
|
||||||
|
|||||||
18
vault/ha.go
18
vault/ha.go
@@ -16,7 +16,6 @@ import (
|
|||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
"github.com/hashicorp/vault/helper/namespace"
|
"github.com/hashicorp/vault/helper/namespace"
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
|
||||||
"github.com/hashicorp/vault/sdk/helper/certutil"
|
"github.com/hashicorp/vault/sdk/helper/certutil"
|
||||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||||
@@ -709,8 +708,10 @@ func (c *Core) periodicLeaderRefresh(newLeaderCh chan func(), stopCh chan struct
|
|||||||
|
|
||||||
// periodicCheckKeyUpgrade is used to watch for key rotation events as a standby
|
// periodicCheckKeyUpgrade is used to watch for key rotation events as a standby
|
||||||
func (c *Core) periodicCheckKeyUpgrades(ctx context.Context, stopCh chan struct{}) {
|
func (c *Core) periodicCheckKeyUpgrades(ctx context.Context, stopCh chan struct{}) {
|
||||||
|
raftBackend := c.getRaftBackend()
|
||||||
|
isRaft := raftBackend != nil
|
||||||
|
|
||||||
opCount := new(int32)
|
opCount := new(int32)
|
||||||
_, isRaft := c.underlyingPhysical.(*raft.RaftBackend)
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(keyRotateCheckInterval):
|
case <-time.After(keyRotateCheckInterval):
|
||||||
@@ -751,8 +752,17 @@ func (c *Core) periodicCheckKeyUpgrades(ctx context.Context, stopCh chan struct{
|
|||||||
c.logger.Error("key rotation periodic upgrade check failed", "error", err)
|
c.logger.Error("key rotation periodic upgrade check failed", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil {
|
if isRaft {
|
||||||
c.logger.Error("raft tls periodic upgrade check failed", "error", err)
|
hasState, err := raftBackend.HasState()
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error("could not check raft state", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if raftBackend.Initialized() && hasState {
|
||||||
|
if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil {
|
||||||
|
c.logger.Error("raft tls periodic upgrade check failed", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddInt32(lopCount, -1)
|
atomic.AddInt32(lopCount, -1)
|
||||||
|
|||||||
@@ -208,30 +208,20 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes
|
|||||||
return nil, ErrAlreadyInit
|
return nil, ErrAlreadyInit
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we have clustered storage, set it up now
|
// Bootstrap the raft backend if that's provided as the physical or
|
||||||
if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
|
// HA backend.
|
||||||
parsedClusterAddr, err := url.Parse(c.ClusterAddr())
|
raftBackend := c.getRaftBackend()
|
||||||
|
if raftBackend != nil {
|
||||||
|
err := c.RaftBootstrap(ctx, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
c.logger.Error("failed to bootstrap raft", "error", err)
|
||||||
}
|
return nil, err
|
||||||
if err := raftStorage.Bootstrap(ctx, []raft.Peer{
|
|
||||||
{
|
|
||||||
ID: raftStorage.NodeID(),
|
|
||||||
Address: parsedClusterAddr.Host,
|
|
||||||
},
|
|
||||||
}); err != nil {
|
|
||||||
return nil, errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{
|
|
||||||
StartAsLeader: true,
|
|
||||||
}); err != nil {
|
|
||||||
return nil, errwrap.Wrapf("could not start clustered storage: {{err}}", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Teardown cluster after bootstrap setup
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := raftStorage.TeardownCluster(nil); err != nil {
|
if err := raftBackend.TeardownCluster(nil); err != nil {
|
||||||
c.logger.Error("failed to stop raft storage", "error", err)
|
c.logger.Error("failed to stop raft", "error", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -385,9 +375,11 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes
|
|||||||
results.RootToken = base64.StdEncoding.EncodeToString(encryptedVals[0])
|
results.RootToken = base64.StdEncoding.EncodeToString(encryptedVals[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.createRaftTLSKeyring(ctx); err != nil {
|
if raftBackend != nil {
|
||||||
c.logger.Error("failed to create raft TLS keyring", "error", err)
|
if _, err := c.raftCreateTLSKeyring(ctx); err != nil {
|
||||||
return nil, err
|
c.logger.Error("failed to create raft TLS keyring", "error", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare to re-seal
|
// Prepare to re-seal
|
||||||
|
|||||||
@@ -29,7 +29,6 @@ import (
|
|||||||
"github.com/hashicorp/vault/helper/monitor"
|
"github.com/hashicorp/vault/helper/monitor"
|
||||||
"github.com/hashicorp/vault/helper/namespace"
|
"github.com/hashicorp/vault/helper/namespace"
|
||||||
"github.com/hashicorp/vault/helper/random"
|
"github.com/hashicorp/vault/helper/random"
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
|
||||||
"github.com/hashicorp/vault/sdk/framework"
|
"github.com/hashicorp/vault/sdk/framework"
|
||||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||||
@@ -166,7 +165,7 @@ func NewSystemBackend(core *Core, logger log.Logger) *SystemBackend {
|
|||||||
b.Backend.Paths = append(b.Backend.Paths, b.rawPaths()...)
|
b.Backend.Paths = append(b.Backend.Paths, b.rawPaths()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := core.underlyingPhysical.(*raft.RaftBackend); ok {
|
if backend := core.getRaftBackend(); backend != nil {
|
||||||
b.Backend.Paths = append(b.Backend.Paths, b.raftStoragePaths()...)
|
b.Backend.Paths = append(b.Backend.Paths, b.raftStoragePaths()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,14 +7,15 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/hashicorp/vault/sdk/framework"
|
||||||
|
"github.com/hashicorp/vault/sdk/logical"
|
||||||
|
"github.com/hashicorp/vault/sdk/physical"
|
||||||
|
|
||||||
proto "github.com/golang/protobuf/proto"
|
proto "github.com/golang/protobuf/proto"
|
||||||
wrapping "github.com/hashicorp/go-kms-wrapping"
|
wrapping "github.com/hashicorp/go-kms-wrapping"
|
||||||
uuid "github.com/hashicorp/go-uuid"
|
uuid "github.com/hashicorp/go-uuid"
|
||||||
"github.com/hashicorp/vault/helper/namespace"
|
"github.com/hashicorp/vault/helper/namespace"
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
"github.com/hashicorp/vault/physical/raft"
|
||||||
"github.com/hashicorp/vault/sdk/framework"
|
|
||||||
"github.com/hashicorp/vault/sdk/logical"
|
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// raftStoragePaths returns paths for use when raft is the storage mechanism.
|
// raftStoragePaths returns paths for use when raft is the storage mechanism.
|
||||||
@@ -132,13 +133,12 @@ func (b *SystemBackend) raftStoragePaths() []*framework.Path {
|
|||||||
|
|
||||||
func (b *SystemBackend) handleRaftConfigurationGet() framework.OperationFunc {
|
func (b *SystemBackend) handleRaftConfigurationGet() framework.OperationFunc {
|
||||||
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
|
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
|
||||||
|
raftBackend := b.Core.getRaftBackend()
|
||||||
raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend)
|
if raftBackend == nil {
|
||||||
if !ok {
|
|
||||||
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
config, err := raftStorage.GetConfiguration(ctx)
|
config, err := raftBackend.GetConfiguration(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -158,12 +158,12 @@ func (b *SystemBackend) handleRaftRemovePeerUpdate() framework.OperationFunc {
|
|||||||
return logical.ErrorResponse("no server id provided"), logical.ErrInvalidRequest
|
return logical.ErrorResponse("no server id provided"), logical.ErrInvalidRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend)
|
raftBackend := b.Core.getRaftBackend()
|
||||||
if !ok {
|
if raftBackend == nil {
|
||||||
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := raftStorage.RemovePeer(ctx, serverID); err != nil {
|
if err := raftBackend.RemovePeer(ctx, serverID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if b.Core.raftFollowerStates != nil {
|
if b.Core.raftFollowerStates != nil {
|
||||||
@@ -221,8 +221,8 @@ func (b *SystemBackend) handleRaftBootstrapChallengeWrite() framework.OperationF
|
|||||||
|
|
||||||
func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc {
|
func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc {
|
||||||
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
|
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
|
||||||
raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend)
|
raftBackend := b.Core.getRaftBackend()
|
||||||
if !ok {
|
if raftBackend == nil {
|
||||||
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -271,9 +271,9 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
|
|||||||
|
|
||||||
switch nonVoter {
|
switch nonVoter {
|
||||||
case true:
|
case true:
|
||||||
err = raftStorage.AddNonVotingPeer(ctx, serverID, clusterAddr)
|
err = raftBackend.AddNonVotingPeer(ctx, serverID, clusterAddr)
|
||||||
default:
|
default:
|
||||||
err = raftStorage.AddPeer(ctx, serverID, clusterAddr)
|
err = raftBackend.AddPeer(ctx, serverID, clusterAddr)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -283,7 +283,7 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
|
|||||||
b.Core.raftFollowerStates.update(serverID, 0)
|
b.Core.raftFollowerStates.update(serverID, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
peers, err := raftStorage.Peers(ctx)
|
peers, err := raftBackend.Peers(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
448
vault/raft.go
448
vault/raft.go
@@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/hashicorp/errwrap"
|
"github.com/hashicorp/errwrap"
|
||||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
wrapping "github.com/hashicorp/go-kms-wrapping"
|
wrapping "github.com/hashicorp/go-kms-wrapping"
|
||||||
uuid "github.com/hashicorp/go-uuid"
|
uuid "github.com/hashicorp/go-uuid"
|
||||||
"github.com/hashicorp/vault/api"
|
"github.com/hashicorp/vault/api"
|
||||||
@@ -31,6 +32,9 @@ import (
|
|||||||
var (
|
var (
|
||||||
raftTLSStoragePath = "core/raft/tls"
|
raftTLSStoragePath = "core/raft/tls"
|
||||||
raftTLSRotationPeriod = 24 * time.Hour
|
raftTLSRotationPeriod = 24 * time.Hour
|
||||||
|
|
||||||
|
// TestingUpdateClusterAddr is used in tests to override the cluster address
|
||||||
|
TestingUpdateClusterAddr uint32
|
||||||
)
|
)
|
||||||
|
|
||||||
type raftFollowerStates struct {
|
type raftFollowerStates struct {
|
||||||
@@ -88,14 +92,11 @@ func (c *Core) GetRaftIndexes() (committed uint64, applied uint64) {
|
|||||||
return raftStorage.CommittedIndex(), raftStorage.AppliedIndex()
|
return raftStorage.CommittedIndex(), raftStorage.AppliedIndex()
|
||||||
}
|
}
|
||||||
|
|
||||||
// startRaftStorage will call SetupCluster in the raft backend which starts raft
|
// startRaftBackend will call SetupCluster in the raft backend which starts raft
|
||||||
// up and enables the cluster handler.
|
// up and enables the cluster handler.
|
||||||
func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
|
func (c *Core) startRaftBackend(ctx context.Context) (retErr error) {
|
||||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
raftBackend := c.getRaftBackend()
|
||||||
if !ok {
|
if raftBackend == nil || raftBackend.Initialized() {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if raftStorage.Initialized() {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -109,6 +110,15 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
|
|||||||
var raftTLS *raft.TLSKeyring
|
var raftTLS *raft.TLSKeyring
|
||||||
switch raftTLSEntry {
|
switch raftTLSEntry {
|
||||||
case nil:
|
case nil:
|
||||||
|
// If this is HA-only and no TLS keyring is found, that means the
|
||||||
|
// cluster has not been bootstrapped or joined. We return early here in
|
||||||
|
// this case. If we return here, the raft object has not been instantiated,
|
||||||
|
// and a bootstrap call should be made.
|
||||||
|
if c.isRaftHAOnly() {
|
||||||
|
c.logger.Trace("skipping raft backend setup during unseal, no bootstrap operation has been started yet")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// If we did not find a TLS keyring we will attempt to create one here.
|
// If we did not find a TLS keyring we will attempt to create one here.
|
||||||
// This happens after a storage migration process. This node is also
|
// This happens after a storage migration process. This node is also
|
||||||
// marked to start as leader so we can write the new TLS Key. This is an
|
// marked to start as leader so we can write the new TLS Key. This is an
|
||||||
@@ -133,8 +143,23 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
hasState, err := raftBackend.HasState()
|
||||||
if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This can be hit on follower nodes that got their config updated to use
|
||||||
|
// raft for HA-only before they are joined to the cluster. Since followers
|
||||||
|
// in this case use shared storage, it doesn't return early from the TLS
|
||||||
|
// case above, but there's not raft state yet for the backend to call
|
||||||
|
// raft.SetupCluster.
|
||||||
|
if !hasState {
|
||||||
|
c.logger.Trace("skipping raft backend setup during unseal, no raft state found")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
||||||
|
if err := raftBackend.SetupCluster(ctx, raft.SetupOpts{
|
||||||
TLSKeyring: raftTLS,
|
TLSKeyring: raftTLS,
|
||||||
ClusterListener: c.getClusterListener(),
|
ClusterListener: c.getClusterListener(),
|
||||||
StartAsLeader: creating,
|
StartAsLeader: creating,
|
||||||
@@ -145,7 +170,7 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
if retErr != nil {
|
if retErr != nil {
|
||||||
c.logger.Info("stopping raft server")
|
c.logger.Info("stopping raft server")
|
||||||
if err := raftStorage.TeardownCluster(c.getClusterListener()); err != nil {
|
if err := raftBackend.TeardownCluster(c.getClusterListener()); err != nil {
|
||||||
c.logger.Error("failed to stop raft server", "error", err)
|
c.logger.Error("failed to stop raft server", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -180,7 +205,114 @@ func (c *Core) stopRaftActiveNode() {
|
|||||||
c.stopPeriodicRaftTLSRotate()
|
c.stopPeriodicRaftTLSRotate()
|
||||||
}
|
}
|
||||||
|
|
||||||
// startPeriodicRaftTLSRotate will spawn a go routine in charge of periodically
|
func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
||||||
|
raftBackend := c.getRaftBackend()
|
||||||
|
|
||||||
|
// No-op if raft is not being used
|
||||||
|
if raftBackend == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c.raftTLSRotationStopCh = make(chan struct{})
|
||||||
|
logger := c.logger.Named("raft")
|
||||||
|
|
||||||
|
if c.isRaftHAOnly() {
|
||||||
|
return c.raftTLSRotateDirect(ctx, logger, c.raftTLSRotationStopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.raftTLSRotatePhased(ctx, logger, raftBackend, c.raftTLSRotationStopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// raftTLSRotateDirect will spawn a go routine in charge of periodically
|
||||||
|
// rotating the TLS certs and keys used for raft traffic.
|
||||||
|
//
|
||||||
|
// The logic for updating the TLS keyring is through direct storage update. This
|
||||||
|
// is called whenever raft is used for HA-only, which means that the underlying
|
||||||
|
// storage is a shared physical object, thus requiring no additional
|
||||||
|
// coordination.
|
||||||
|
func (c *Core) raftTLSRotateDirect(ctx context.Context, logger hclog.Logger, stopCh chan struct{}) error {
|
||||||
|
logger.Info("creating new raft TLS config")
|
||||||
|
|
||||||
|
rotateKeyring := func() (time.Time, error) {
|
||||||
|
// Create a new key
|
||||||
|
raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, errwrap.Wrapf("failed to generate new raft TLS key: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the existing keyring
|
||||||
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance the term and store the new key, replacing the old one.
|
||||||
|
// Unlike phased rotation, we don't need to update AppliedIndex since
|
||||||
|
// we don't rely on it to check whether the followers got the key. A
|
||||||
|
// shared storage means that followers will have the key as soon as it's
|
||||||
|
// written to storage.
|
||||||
|
keyring.Term += 1
|
||||||
|
keyring.Keys[0] = raftTLSKey
|
||||||
|
keyring.ActiveKeyID = raftTLSKey.ID
|
||||||
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err)
|
||||||
|
}
|
||||||
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
||||||
|
return time.Time{}, errwrap.Wrapf("failed to write keyring: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("wrote new raft TLS config")
|
||||||
|
|
||||||
|
// Schedule the next rotation
|
||||||
|
return raftTLSKey.CreatedTime.Add(raftTLSRotationPeriod), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the keyring to calculate the time of next rotation.
|
||||||
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
activeKey := keyring.GetActive()
|
||||||
|
if activeKey == nil {
|
||||||
|
return errors.New("no active raft TLS key found")
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
nextRotationTime := activeKey.CreatedTime.Add(raftTLSRotationPeriod)
|
||||||
|
|
||||||
|
var backoff bool
|
||||||
|
for {
|
||||||
|
// If we encountered and error we should try to create the key
|
||||||
|
// again.
|
||||||
|
if backoff {
|
||||||
|
nextRotationTime = time.Now().Add(10 * time.Second)
|
||||||
|
backoff = false
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Until(nextRotationTime)):
|
||||||
|
// It's time to rotate the keys
|
||||||
|
next, err := rotateKeyring()
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to rotate TLS key", "error", err)
|
||||||
|
backoff = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
nextRotationTime = next
|
||||||
|
|
||||||
|
case <-stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// raftTLSRotatePhased will spawn a go routine in charge of periodically
|
||||||
// rotating the TLS certs and keys used for raft traffic.
|
// rotating the TLS certs and keys used for raft traffic.
|
||||||
//
|
//
|
||||||
// The logic for updating the TLS certificate uses a pseudo two phase commit
|
// The logic for updating the TLS certificate uses a pseudo two phase commit
|
||||||
@@ -199,55 +331,30 @@ func (c *Core) stopRaftActiveNode() {
|
|||||||
// receives the update. This ensures a standby node isn't left behind and unable
|
// receives the update. This ensures a standby node isn't left behind and unable
|
||||||
// to reconnect with the cluster. Additionally, only one outstanding key
|
// to reconnect with the cluster. Additionally, only one outstanding key
|
||||||
// is allowed for this same reason (max keyring size of 2).
|
// is allowed for this same reason (max keyring size of 2).
|
||||||
func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
func (c *Core) raftTLSRotatePhased(ctx context.Context, logger hclog.Logger, raftBackend *raft.RaftBackend, stopCh chan struct{}) error {
|
||||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
followerStates := &raftFollowerStates{
|
followerStates := &raftFollowerStates{
|
||||||
followers: make(map[string]uint64),
|
followers: make(map[string]uint64),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pre-populate the follower list with the set of peers.
|
// Pre-populate the follower list with the set of peers.
|
||||||
raftConfig, err := raftStorage.GetConfiguration(ctx)
|
raftConfig, err := raftBackend.GetConfiguration(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, server := range raftConfig.Servers {
|
for _, server := range raftConfig.Servers {
|
||||||
if server.NodeID != raftStorage.NodeID() {
|
if server.NodeID != raftBackend.NodeID() {
|
||||||
followerStates.update(server.NodeID, 0)
|
followerStates.update(server.NodeID, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger := c.logger.Named("raft")
|
|
||||||
c.raftTLSRotationStopCh = stopCh
|
|
||||||
c.raftFollowerStates = followerStates
|
c.raftFollowerStates = followerStates
|
||||||
|
|
||||||
readKeyring := func() (*raft.TLSKeyring, error) {
|
|
||||||
tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if tlsKeyringEntry == nil {
|
|
||||||
return nil, errors.New("no keyring found")
|
|
||||||
}
|
|
||||||
var keyring raft.TLSKeyring
|
|
||||||
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &keyring, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// rotateKeyring writes new key data to the keyring and adds an applied
|
// rotateKeyring writes new key data to the keyring and adds an applied
|
||||||
// index that is used to verify it has been committed. The keys written in
|
// index that is used to verify it has been committed. The keys written in
|
||||||
// this function can be used on standbys but the active node doesn't start
|
// this function can be used on standbys but the active node doesn't start
|
||||||
// using it yet.
|
// using it yet.
|
||||||
rotateKeyring := func() (time.Time, error) {
|
rotateKeyring := func() (time.Time, error) {
|
||||||
// Read the existing keyring
|
// Read the existing keyring
|
||||||
keyring, err := readKeyring()
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return time.Time{}, errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err)
|
return time.Time{}, errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err)
|
||||||
}
|
}
|
||||||
@@ -256,8 +363,8 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||||||
case len(keyring.Keys) == 2 && keyring.Keys[1].AppliedIndex == 0:
|
case len(keyring.Keys) == 2 && keyring.Keys[1].AppliedIndex == 0:
|
||||||
// If this case is hit then the second write to add the applied
|
// If this case is hit then the second write to add the applied
|
||||||
// index failed. Attempt to write it again.
|
// index failed. Attempt to write it again.
|
||||||
keyring.Keys[1].AppliedIndex = raftStorage.AppliedIndex()
|
keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex()
|
||||||
keyring.AppliedIndex = raftStorage.AppliedIndex()
|
keyring.AppliedIndex = raftBackend.AppliedIndex()
|
||||||
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err)
|
return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err)
|
||||||
@@ -270,7 +377,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||||||
// If there already exists a pending key update then the update
|
// If there already exists a pending key update then the update
|
||||||
// hasn't replicated down to all standby nodes yet. Don't allow any
|
// hasn't replicated down to all standby nodes yet. Don't allow any
|
||||||
// new keys to be created until all standbys have seen this previous
|
// new keys to be created until all standbys have seen this previous
|
||||||
// rotation. As a backoff strategy another rotation attempt is
|
// rotation. As a backoff strategy, another rotation attempt is
|
||||||
// scheduled for 5 minutes from now.
|
// scheduled for 5 minutes from now.
|
||||||
logger.Warn("skipping new raft TLS config creation, keys are pending")
|
logger.Warn("skipping new raft TLS config creation, keys are pending")
|
||||||
return time.Now().Add(time.Minute * 5), nil
|
return time.Now().Add(time.Minute * 5), nil
|
||||||
@@ -296,9 +403,9 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write the keyring again with the new applied index. This allows us to
|
// Write the keyring again with the new applied index. This allows us to
|
||||||
// track if standby nodes receive the update.
|
// track if standby nodes received the update.
|
||||||
keyring.Keys[1].AppliedIndex = raftStorage.AppliedIndex()
|
keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex()
|
||||||
keyring.AppliedIndex = raftStorage.AppliedIndex()
|
keyring.AppliedIndex = raftBackend.AppliedIndex()
|
||||||
entry, err = logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
entry, err = logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err)
|
return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err)
|
||||||
@@ -316,7 +423,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||||||
// finalizes the rotation by deleting the old keys and updating the raft
|
// finalizes the rotation by deleting the old keys and updating the raft
|
||||||
// backend.
|
// backend.
|
||||||
checkCommitted := func() error {
|
checkCommitted := func() error {
|
||||||
keyring, err := readKeyring()
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err)
|
return errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err)
|
||||||
}
|
}
|
||||||
@@ -346,7 +453,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update the TLS Key in the backend
|
// Update the TLS Key in the backend
|
||||||
if err := raftStorage.SetTLSKeyring(keyring); err != nil {
|
if err := raftBackend.SetTLSKeyring(keyring); err != nil {
|
||||||
return errwrap.Wrapf("failed to install keyring: {{err}}", err)
|
return errwrap.Wrapf("failed to install keyring: {{err}}", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -355,7 +462,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Read the keyring to calculate the time of next rotation.
|
// Read the keyring to calculate the time of next rotation.
|
||||||
keyring, err := readKeyring()
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -406,14 +513,43 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) createRaftTLSKeyring(ctx context.Context) error {
|
func (c *Core) raftReadTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) {
|
||||||
if _, ok := c.underlyingPhysical.(*raft.RaftBackend); !ok {
|
tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
||||||
return nil
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if tlsKeyringEntry == nil {
|
||||||
|
return nil, errors.New("no keyring found")
|
||||||
|
}
|
||||||
|
var keyring raft.TLSKeyring
|
||||||
|
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &keyring, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// raftCreateTLSKeyring creates the initial TLS key and the TLS Keyring for raft
|
||||||
|
// use. If a keyring entry is already present in storage, it will return an
|
||||||
|
// error.
|
||||||
|
func (c *Core) raftCreateTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) {
|
||||||
|
if raftBackend := c.getRaftBackend(); raftBackend == nil {
|
||||||
|
return nil, fmt.Errorf("raft backend not in use")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the keyring is already present
|
||||||
|
raftTLSEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if raftTLSEntry != nil {
|
||||||
|
return nil, fmt.Errorf("TLS keyring already present")
|
||||||
}
|
}
|
||||||
|
|
||||||
raftTLS, err := raft.GenerateTLSKey(c.secureRandomReader)
|
raftTLS, err := raft.GenerateTLSKey(c.secureRandomReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
keyring := &raft.TLSKeyring{
|
keyring := &raft.TLSKeyring{
|
||||||
@@ -423,12 +559,12 @@ func (c *Core) createRaftTLSKeyring(ctx context.Context) error {
|
|||||||
|
|
||||||
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := c.barrier.Put(ctx, entry); err != nil {
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
return nil
|
return keyring, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) stopPeriodicRaftTLSRotate() {
|
func (c *Core) stopPeriodicRaftTLSRotate() {
|
||||||
@@ -440,8 +576,8 @@ func (c *Core) stopPeriodicRaftTLSRotate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error {
|
func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error {
|
||||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
raftBackend := c.getRaftBackend()
|
||||||
if !ok {
|
if raftBackend == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -458,7 +594,7 @@ func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := raftStorage.SetTLSKeyring(&keyring); err != nil {
|
if err := raftBackend.SetTLSKeyring(&keyring); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -544,16 +680,16 @@ func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(co
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) InitiateRetryJoin(ctx context.Context) error {
|
func (c *Core) InitiateRetryJoin(ctx context.Context) error {
|
||||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
raftBackend := c.getRaftBackend()
|
||||||
if !ok {
|
if raftBackend == nil {
|
||||||
return errors.New("raft storage not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
if raftStorage.Initialized() {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
leaderInfos, err := raftStorage.JoinConfig()
|
if raftBackend.Initialized() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderInfos, err := raftBackend.JoinConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -573,23 +709,75 @@ func (c *Core) InitiateRetryJoin(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) {
|
func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) {
|
||||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
raftBackend := c.getRaftBackend()
|
||||||
if !ok {
|
if raftBackend == nil {
|
||||||
return false, errors.New("raft storage not configured")
|
return false, errors.New("raft backend not in use")
|
||||||
}
|
|
||||||
|
|
||||||
if raftStorage.Initialized() {
|
|
||||||
return false, errors.New("raft storage is already initialized")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
init, err := c.Initialized(ctx)
|
init, err := c.Initialized(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errwrap.Wrapf("failed to check if core is initialized: {{err}}", err)
|
return false, errwrap.Wrapf("failed to check if core is initialized: {{err}}", err)
|
||||||
}
|
}
|
||||||
if init {
|
|
||||||
|
isRaftHAOnly := c.isRaftHAOnly()
|
||||||
|
// Prevent join from happening if we're using raft for storage and
|
||||||
|
// it has already been initialized.
|
||||||
|
if init && !isRaftHAOnly {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check on seal status and storage type before proceeding:
|
||||||
|
// If raft is used for storage, core needs to be sealed
|
||||||
|
if !isRaftHAOnly && !c.Sealed() {
|
||||||
|
c.logger.Error("node must be seal before joining")
|
||||||
|
return false, errors.New("node must be sealed before joining")
|
||||||
|
}
|
||||||
|
|
||||||
|
// If raft is used for ha-only, core needs to be unsealed
|
||||||
|
if isRaftHAOnly && c.Sealed() {
|
||||||
|
c.logger.Error("node must be unsealed before joining")
|
||||||
|
return false, errors.New("node must be unsealed before joining")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disallow leader API address to be provided if we're using raft for HA-only
|
||||||
|
// The leader API address is obtained directly through storage. This serves
|
||||||
|
// as a form of verification that this node is sharing the same physical
|
||||||
|
// storage as the leader node.
|
||||||
|
if isRaftHAOnly {
|
||||||
|
for _, info := range leaderInfos {
|
||||||
|
if info.LeaderAPIAddr != "" {
|
||||||
|
return false, errors.New("leader API address must be unset when raft is used exclusively for HA")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the leader address from storage
|
||||||
|
keys, err := c.barrier.List(ctx, coreLeaderPrefix)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(keys) == 0 || len(keys[0]) == 0 {
|
||||||
|
return false, errors.New("unable to fetch leadership entry")
|
||||||
|
}
|
||||||
|
|
||||||
|
leadershipEntry := coreLeaderPrefix + keys[0]
|
||||||
|
entry, err := c.barrier.Get(ctx, leadershipEntry)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if entry == nil {
|
||||||
|
return false, errors.New("unable to read leadership entry")
|
||||||
|
}
|
||||||
|
|
||||||
|
var adv activeAdvertisement
|
||||||
|
err = jsonutil.DecodeJSON(entry.Value, &adv)
|
||||||
|
if err != nil {
|
||||||
|
return false, errwrap.Wrapf("unable to decoded leader entry: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderInfos[0].LeaderAPIAddr = adv.RedirectAddr
|
||||||
|
}
|
||||||
|
|
||||||
join := func(retry bool) error {
|
join := func(retry bool) error {
|
||||||
joinLeader := func(leaderInfo *raft.LeaderJoinInfo) error {
|
joinLeader := func(leaderInfo *raft.LeaderJoinInfo) error {
|
||||||
if leaderInfo == nil {
|
if leaderInfo == nil {
|
||||||
@@ -603,14 +791,11 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errwrap.Wrapf("failed to check if core is initialized: {{err}}", err)
|
return errwrap.Wrapf("failed to check if core is initialized: {{err}}", err)
|
||||||
}
|
}
|
||||||
if init {
|
|
||||||
|
if init && !isRaftHAOnly {
|
||||||
c.logger.Info("returning from raft join as the node is initialized")
|
c.logger.Info("returning from raft join as the node is initialized")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if !c.Sealed() {
|
|
||||||
c.logger.Info("returning from raft join as the node is unsealed")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderInfo.LeaderAPIAddr)
|
c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderInfo.LeaderAPIAddr)
|
||||||
|
|
||||||
@@ -648,7 +833,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||||||
|
|
||||||
// Attempt to join the leader by requesting for the bootstrap challenge
|
// Attempt to join the leader by requesting for the bootstrap challenge
|
||||||
secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{
|
secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{
|
||||||
"server_id": raftStorage.NodeID(),
|
"server_id": raftBackend.NodeID(),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errwrap.Wrapf("error during raft bootstrap init call: {{err}}", err)
|
return errwrap.Wrapf("error during raft bootstrap init call: {{err}}", err)
|
||||||
@@ -687,7 +872,10 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||||||
nonVoter: nonVoter,
|
nonVoter: nonVoter,
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.seal.BarrierType() == wrapping.Shamir {
|
// If we're using Shamir and using raft for both physical and HA, we
|
||||||
|
// need to block until the node is unsealed, unless retry is set to
|
||||||
|
// false.
|
||||||
|
if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly {
|
||||||
c.raftInfo = raftInfo
|
c.raftInfo = raftInfo
|
||||||
if err := c.seal.SetBarrierConfig(ctx, &sealConfig); err != nil {
|
if err := c.seal.SetBarrierConfig(ctx, &sealConfig); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -708,7 +896,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||||||
return errwrap.Wrapf("failed to send answer to raft leader node: {{err}}", err)
|
return errwrap.Wrapf("failed to send answer to raft leader node: {{err}}", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.seal.BarrierType() == wrapping.Shamir {
|
if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly {
|
||||||
// Reset the state
|
// Reset the state
|
||||||
c.raftInfo = nil
|
c.raftInfo = nil
|
||||||
|
|
||||||
@@ -763,20 +951,41 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is used in tests to override the cluster address
|
// getRaftBackend returns the RaftBackend from the HA or physical backend,
|
||||||
var UpdateClusterAddrForTests uint32
|
// in that order of preference, or nil if not of type RaftBackend.
|
||||||
|
func (c *Core) getRaftBackend() *raft.RaftBackend {
|
||||||
|
var raftBackend *raft.RaftBackend
|
||||||
|
|
||||||
|
if raftHA, ok := c.ha.(*raft.RaftBackend); ok {
|
||||||
|
raftBackend = raftHA
|
||||||
|
}
|
||||||
|
|
||||||
|
if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
|
||||||
|
raftBackend = raftStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
return raftBackend
|
||||||
|
}
|
||||||
|
|
||||||
|
// isRaftHAOnly returns true if c.ha is raft and physical storage is non-raft
|
||||||
|
func (c *Core) isRaftHAOnly() bool {
|
||||||
|
_, isRaftHA := c.ha.(*raft.RaftBackend)
|
||||||
|
_, isRaftStorage := c.underlyingPhysical.(*raft.RaftBackend)
|
||||||
|
|
||||||
|
return isRaftHA && !isRaftStorage
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, raftInfo *raftInformation) error {
|
func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, raftInfo *raftInformation) error {
|
||||||
if raftInfo.challenge == nil {
|
if raftInfo.challenge == nil {
|
||||||
return errors.New("raft challenge is nil")
|
return errors.New("raft challenge is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
raftBackend := c.getRaftBackend()
|
||||||
if !ok {
|
if raftBackend == nil {
|
||||||
return errors.New("raft storage not in use")
|
return errors.New("raft backend is not in use")
|
||||||
}
|
}
|
||||||
|
|
||||||
if raftStorage.Initialized() {
|
if raftBackend.Initialized() {
|
||||||
return errors.New("raft is already initialized")
|
return errors.New("raft is already initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -790,7 +999,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access,
|
|||||||
return errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
return errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
||||||
}
|
}
|
||||||
clusterAddr := parsedClusterAddr.Host
|
clusterAddr := parsedClusterAddr.Host
|
||||||
if atomic.LoadUint32(&UpdateClusterAddrForTests) == 1 && strings.HasSuffix(clusterAddr, ":0") {
|
if atomic.LoadUint32(&TestingUpdateClusterAddr) == 1 && strings.HasSuffix(clusterAddr, ":0") {
|
||||||
// We are testing and have an address provider, so just create a random
|
// We are testing and have an address provider, so just create a random
|
||||||
// addr, it will be overwritten later.
|
// addr, it will be overwritten later.
|
||||||
var err error
|
var err error
|
||||||
@@ -804,7 +1013,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access,
|
|||||||
if err := answerReq.SetJSONBody(map[string]interface{}{
|
if err := answerReq.SetJSONBody(map[string]interface{}{
|
||||||
"answer": base64.StdEncoding.EncodeToString(plaintext),
|
"answer": base64.StdEncoding.EncodeToString(plaintext),
|
||||||
"cluster_addr": clusterAddr,
|
"cluster_addr": clusterAddr,
|
||||||
"server_id": raftStorage.NodeID(),
|
"server_id": raftBackend.NodeID(),
|
||||||
"non_voter": raftInfo.nonVoter,
|
"non_voter": raftInfo.nonVoter,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -823,15 +1032,17 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
raftStorage.Bootstrap(ctx, answerResp.Data.Peers)
|
if err := raftBackend.Bootstrap(answerResp.Data.Peers); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
err = c.startClusterListener(ctx)
|
err = c.startClusterListener(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errwrap.Wrapf("error starting cluster: {{err}}", err)
|
return errwrap.Wrapf("error starting cluster: {{err}}", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
||||||
err = raftStorage.SetupCluster(ctx, raft.SetupOpts{
|
err = raftBackend.SetupCluster(ctx, raft.SetupOpts{
|
||||||
TLSKeyring: answerResp.Data.TLSKeyring,
|
TLSKeyring: answerResp.Data.TLSKeyring,
|
||||||
ClusterListener: c.getClusterListener(),
|
ClusterListener: c.getClusterListener(),
|
||||||
})
|
})
|
||||||
@@ -842,6 +1053,57 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RaftBootstrap performs bootstrapping of a raft cluster if core contains a raft
|
||||||
|
// backend. If raft is not part for the storage or HA storage backend, this
|
||||||
|
// call results in an error.
|
||||||
|
func (c *Core) RaftBootstrap(ctx context.Context, onInit bool) error {
|
||||||
|
if c.logger.IsDebug() {
|
||||||
|
c.logger.Debug("bootstrapping raft backend")
|
||||||
|
defer c.logger.Debug("finished bootstrapping raft backend")
|
||||||
|
}
|
||||||
|
|
||||||
|
raftBackend := c.getRaftBackend()
|
||||||
|
if raftBackend == nil {
|
||||||
|
return errors.New("raft backend not in use")
|
||||||
|
}
|
||||||
|
|
||||||
|
parsedClusterAddr, err := url.Parse(c.ClusterAddr())
|
||||||
|
if err != nil {
|
||||||
|
return errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
||||||
|
}
|
||||||
|
if err := raftBackend.Bootstrap([]raft.Peer{
|
||||||
|
{
|
||||||
|
ID: raftBackend.NodeID(),
|
||||||
|
Address: parsedClusterAddr.Host,
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
|
return errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
raftOpts := raft.SetupOpts{
|
||||||
|
StartAsLeader: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
if !onInit {
|
||||||
|
// Generate the TLS Keyring info for SetupCluster to consume
|
||||||
|
raftTLS, err := c.raftCreateTLSKeyring(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errwrap.Wrapf("could not generate TLS keyring during bootstrap: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
||||||
|
raftOpts.ClusterListener = c.getClusterListener()
|
||||||
|
|
||||||
|
raftOpts.TLSKeyring = raftTLS
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := raftBackend.SetupCluster(ctx, raftOpts); err != nil {
|
||||||
|
return errwrap.Wrapf("could not start clustered storage: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Core) isRaftUnseal() bool {
|
func (c *Core) isRaftUnseal() bool {
|
||||||
return c.raftInfo != nil
|
return c.raftInfo != nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/vault/helper/forwarding"
|
"github.com/hashicorp/vault/helper/forwarding"
|
||||||
"github.com/hashicorp/vault/physical/raft"
|
|
||||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||||
"github.com/hashicorp/vault/vault/replication"
|
"github.com/hashicorp/vault/vault/replication"
|
||||||
)
|
)
|
||||||
@@ -82,9 +81,11 @@ func (s *forwardedRequestRPCServer) Echo(ctx context.Context, in *EchoRequest) (
|
|||||||
ReplicationState: uint32(s.core.ReplicationState()),
|
ReplicationState: uint32(s.core.ReplicationState()),
|
||||||
}
|
}
|
||||||
|
|
||||||
if raftStorage, ok := s.core.underlyingPhysical.(*raft.RaftBackend); ok {
|
if raftBackend := s.core.getRaftBackend(); raftBackend != nil {
|
||||||
reply.RaftAppliedIndex = raftStorage.AppliedIndex()
|
if !s.core.isRaftHAOnly() {
|
||||||
reply.RaftNodeID = raftStorage.NodeID()
|
reply.RaftAppliedIndex = raftBackend.AppliedIndex()
|
||||||
|
reply.RaftNodeID = raftBackend.NodeID()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return reply, nil
|
return reply, nil
|
||||||
@@ -111,9 +112,11 @@ func (c *forwardingClient) startHeartbeat() {
|
|||||||
ClusterAddr: clusterAddr,
|
ClusterAddr: clusterAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
if raftStorage, ok := c.core.underlyingPhysical.(*raft.RaftBackend); ok {
|
if raftBackend := c.core.getRaftBackend(); raftBackend != nil {
|
||||||
req.RaftAppliedIndex = raftStorage.AppliedIndex()
|
if !c.core.isRaftHAOnly() {
|
||||||
req.RaftNodeID = raftStorage.NodeID()
|
req.RaftAppliedIndex = raftBackend.AppliedIndex()
|
||||||
|
req.RaftNodeID = raftBackend.NodeID()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
|
ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
|
||||||
|
|||||||
@@ -27,11 +27,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/mitchellh/copystructure"
|
|
||||||
testing "github.com/mitchellh/go-testing-interface"
|
|
||||||
"golang.org/x/crypto/ed25519"
|
|
||||||
"golang.org/x/net/http2"
|
|
||||||
|
|
||||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||||
log "github.com/hashicorp/go-hclog"
|
log "github.com/hashicorp/go-hclog"
|
||||||
raftlib "github.com/hashicorp/raft"
|
raftlib "github.com/hashicorp/raft"
|
||||||
@@ -53,6 +48,10 @@ import (
|
|||||||
physInmem "github.com/hashicorp/vault/sdk/physical/inmem"
|
physInmem "github.com/hashicorp/vault/sdk/physical/inmem"
|
||||||
"github.com/hashicorp/vault/vault/cluster"
|
"github.com/hashicorp/vault/vault/cluster"
|
||||||
"github.com/hashicorp/vault/vault/seal"
|
"github.com/hashicorp/vault/vault/seal"
|
||||||
|
"github.com/mitchellh/copystructure"
|
||||||
|
testing "github.com/mitchellh/go-testing-interface"
|
||||||
|
"golang.org/x/crypto/ed25519"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// This file contains a number of methods that are useful for unit
|
// This file contains a number of methods that are useful for unit
|
||||||
@@ -888,7 +887,10 @@ func CleanupClusters(clusters []*TestCluster) {
|
|||||||
func (c *TestCluster) Cleanup() {
|
func (c *TestCluster) Cleanup() {
|
||||||
c.Logger.Info("cleaning up vault cluster")
|
c.Logger.Info("cleaning up vault cluster")
|
||||||
for _, core := range c.Cores {
|
for _, core := range c.Cores {
|
||||||
core.CoreConfig.Logger.SetLevel(log.Error)
|
// Upgrade logger to emit errors if not doing so already
|
||||||
|
if !core.CoreConfig.Logger.IsError() {
|
||||||
|
core.CoreConfig.Logger.SetLevel(log.Error)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
@@ -964,6 +966,7 @@ type TestClusterCore struct {
|
|||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
UnderlyingStorage physical.Backend
|
UnderlyingStorage physical.Backend
|
||||||
UnderlyingRawStorage physical.Backend
|
UnderlyingRawStorage physical.Backend
|
||||||
|
UnderlyingHAStorage physical.HABackend
|
||||||
Barrier SecurityBarrier
|
Barrier SecurityBarrier
|
||||||
NodeID string
|
NodeID string
|
||||||
}
|
}
|
||||||
@@ -1506,6 +1509,7 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te
|
|||||||
Barrier: cores[i].barrier,
|
Barrier: cores[i].barrier,
|
||||||
NodeID: fmt.Sprintf("core-%d", i),
|
NodeID: fmt.Sprintf("core-%d", i),
|
||||||
UnderlyingRawStorage: coreConfigs[i].Physical,
|
UnderlyingRawStorage: coreConfigs[i].Physical,
|
||||||
|
UnderlyingHAStorage: coreConfigs[i].HAPhysical,
|
||||||
}
|
}
|
||||||
tcc.ReloadFuncs = &cores[i].reloadFuncs
|
tcc.ReloadFuncs = &cores[i].reloadFuncs
|
||||||
tcc.ReloadFuncsLock = &cores[i].reloadFuncsLock
|
tcc.ReloadFuncsLock = &cores[i].reloadFuncsLock
|
||||||
@@ -1675,9 +1679,14 @@ func (testCluster *TestCluster) newCore(
|
|||||||
case physBundle == nil && coreConfig.Physical == nil:
|
case physBundle == nil && coreConfig.Physical == nil:
|
||||||
t.Fatal("PhysicalFactory produced no physical and none in CoreConfig")
|
t.Fatal("PhysicalFactory produced no physical and none in CoreConfig")
|
||||||
case physBundle != nil:
|
case physBundle != nil:
|
||||||
testCluster.Logger.Info("created physical backend", "instance", idx)
|
// Storage backend setup
|
||||||
coreConfig.Physical = physBundle.Backend
|
if physBundle.Backend != nil {
|
||||||
localConfig.Physical = physBundle.Backend
|
testCluster.Logger.Info("created physical backend", "instance", idx)
|
||||||
|
coreConfig.Physical = physBundle.Backend
|
||||||
|
localConfig.Physical = physBundle.Backend
|
||||||
|
}
|
||||||
|
|
||||||
|
// HA Backend setup
|
||||||
haBackend := physBundle.HABackend
|
haBackend := physBundle.HABackend
|
||||||
if haBackend == nil {
|
if haBackend == nil {
|
||||||
if ha, ok := physBundle.Backend.(physical.HABackend); ok {
|
if ha, ok := physBundle.Backend.(physical.HABackend); ok {
|
||||||
@@ -1686,6 +1695,8 @@ func (testCluster *TestCluster) newCore(
|
|||||||
}
|
}
|
||||||
coreConfig.HAPhysical = haBackend
|
coreConfig.HAPhysical = haBackend
|
||||||
localConfig.HAPhysical = haBackend
|
localConfig.HAPhysical = haBackend
|
||||||
|
|
||||||
|
// Cleanup setup
|
||||||
if physBundle.Cleanup != nil {
|
if physBundle.Cleanup != nil {
|
||||||
cleanupFunc = physBundle.Cleanup
|
cleanupFunc = physBundle.Cleanup
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -141,6 +141,7 @@ $ curl \
|
|||||||
|
|
||||||
This endpoint returns a snapshot of the current state of the raft cluster. The
|
This endpoint returns a snapshot of the current state of the raft cluster. The
|
||||||
snapshot is returned as binary data and should be redirected to a file.
|
snapshot is returned as binary data and should be redirected to a file.
|
||||||
|
Unavailable if Raft is used exclusively for `ha_storage`.
|
||||||
|
|
||||||
| Method | Path |
|
| Method | Path |
|
||||||
| :----- | :--------------------------- |
|
| :----- | :--------------------------- |
|
||||||
@@ -157,7 +158,8 @@ $ curl \
|
|||||||
|
|
||||||
## Restore Raft using a snapshot
|
## Restore Raft using a snapshot
|
||||||
|
|
||||||
Installs the provided snapshot, returning the cluster to the state defined in it.
|
Installs the provided snapshot, returning the cluster to the state defined in
|
||||||
|
it. Unavailable if Raft is used exclusively for `ha_storage`.
|
||||||
|
|
||||||
| Method | Path |
|
| Method | Path |
|
||||||
| :----- | :--------------------------- |
|
| :----- | :--------------------------- |
|
||||||
@@ -178,7 +180,7 @@ $ curl \
|
|||||||
Installs the provided snapshot, returning the cluster to the state defined in
|
Installs the provided snapshot, returning the cluster to the state defined in
|
||||||
it. This is same as writing to `/sys/storage/raft/snapshot` except that this
|
it. This is same as writing to `/sys/storage/raft/snapshot` except that this
|
||||||
bypasses checks ensuring the Autounseal or shamir keys are consistent with the
|
bypasses checks ensuring the Autounseal or shamir keys are consistent with the
|
||||||
snapshot data.
|
snapshot data. Unavailable if Raft is used exclusively for `ha_storage`.
|
||||||
|
|
||||||
| Method | Path |
|
| Method | Path |
|
||||||
| :----- | :--------------------------------- |
|
| :----- | :--------------------------------- |
|
||||||
|
|||||||
@@ -76,6 +76,10 @@ defined `path`. `node_id` can optionally be set to identify this node.
|
|||||||
cluster hostname of this node. For more configuration options see the [raft
|
cluster hostname of this node. For more configuration options see the [raft
|
||||||
storage configuration documentation](/docs/configuration/storage/raft).
|
storage configuration documentation](/docs/configuration/storage/raft).
|
||||||
|
|
||||||
|
If the original configuration uses "raft" for `ha_storage` a different
|
||||||
|
`path` needs to be declared for the path in `storage_destination` and the new
|
||||||
|
configuration for the node post-migration.
|
||||||
|
|
||||||
```hcl
|
```hcl
|
||||||
storage_source "consul" {
|
storage_source "consul" {
|
||||||
address = "127.0.0.1:8500"
|
address = "127.0.0.1:8500"
|
||||||
@@ -115,6 +119,9 @@ vault server.
|
|||||||
After migration the raft cluster will only have a single node. Additional peers
|
After migration the raft cluster will only have a single node. Additional peers
|
||||||
should be joined to this node.
|
should be joined to this node.
|
||||||
|
|
||||||
|
If the cluster was previously HA-enabled using "raft" as the `ha_storage`, the
|
||||||
|
nodes will have to re-join to the migrated node before unsealing.
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
The following flags are available for the `operator migrate` command.
|
The following flags are available for the `operator migrate` command.
|
||||||
|
|||||||
@@ -28,8 +28,13 @@ Subcommands:
|
|||||||
|
|
||||||
This command is used to join a new node as a peer to the Raft cluster. In order
|
This command is used to join a new node as a peer to the Raft cluster. In order
|
||||||
to join, there must be at least one existing member of the cluster. If Shamir
|
to join, there must be at least one existing member of the cluster. If Shamir
|
||||||
seal is in use, then this API will request for the unseal keys to be supplied to
|
seal is in use, then unseal keys are to be supplied before or after the
|
||||||
join the cluster.
|
join process, depending on whether it's being used exclusively for HA.
|
||||||
|
|
||||||
|
If raft is used for `storage`, the node must be joined before unsealing and the
|
||||||
|
`leader-api-addr` argument must be provided. If raft is used for `ha_storage`,
|
||||||
|
the node must be first unsealed before joining and the `leader-api-addr` must
|
||||||
|
_not_ be provided.
|
||||||
|
|
||||||
```text
|
```text
|
||||||
Usage: vault operator raft join [options] <leader-api-addr>
|
Usage: vault operator raft join [options] <leader-api-addr>
|
||||||
@@ -72,7 +77,7 @@ Usage: vault operator raft list-peers
|
|||||||
|
|
||||||
### Example Output
|
### Example Output
|
||||||
|
|
||||||
```python
|
```json
|
||||||
{
|
{
|
||||||
...
|
...
|
||||||
"data": {
|
"data": {
|
||||||
@@ -145,6 +150,8 @@ Usage: vault operator raft snapshot save <snapshot_file>
|
|||||||
$ vault operator raft snapshot save raft.snap
|
$ vault operator raft snapshot save raft.snap
|
||||||
```
|
```
|
||||||
|
|
||||||
|
~> **Note:** Snapshot is not supported when Raft is used for `ha_storage`.
|
||||||
|
|
||||||
### snapshot restore
|
### snapshot restore
|
||||||
|
|
||||||
Restores a snapshot of Vault data taken with `vault operator raft snapshot save`.
|
Restores a snapshot of Vault data taken with `vault operator raft snapshot save`.
|
||||||
|
|||||||
@@ -43,3 +43,7 @@ In order to bring the Vault server up reliably, using any node's raft data,
|
|||||||
recovery mode Vault automatically resizes the cluster to size 1. This means
|
recovery mode Vault automatically resizes the cluster to size 1. This means
|
||||||
that after having used recovery mode, part of the procedure for returning to
|
that after having used recovery mode, part of the procedure for returning to
|
||||||
active service must include rejoining the raft cluster.
|
active service must include rejoining the raft cluster.
|
||||||
|
|
||||||
|
If Raft is used exclusively for `ha_storage`, recovery mode will not allow for
|
||||||
|
changes to the Raft data but instead allow for modification of the underlying
|
||||||
|
physical data that is associated with Vault's storage backend.
|
||||||
|
|||||||
@@ -39,9 +39,6 @@ between the nodes in the Raft cluster.
|
|||||||
~> **Note:** When using the Raft storage backend, a separate `ha_storage`
|
~> **Note:** When using the Raft storage backend, a separate `ha_storage`
|
||||||
backend cannot be declared.
|
backend cannot be declared.
|
||||||
|
|
||||||
~> **Note:** Raft cannot be used as the configured `ha_storage` backend at this
|
|
||||||
time. To use Raft for HA coordination users must also use Raft for storage.
|
|
||||||
|
|
||||||
~> **Note:** When using the Raft storage backend, it is strongly recommended to
|
~> **Note:** When using the Raft storage backend, it is strongly recommended to
|
||||||
set `disable_mlock` to `true`, and to disable memory swapping on the system.
|
set `disable_mlock` to `true`, and to disable memory swapping on the system.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user