mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-01 11:08:10 +00:00
backport of commit 0fa36a36ae (#23443)
Co-authored-by: Paul Banks <pbanks@hashicorp.com>
This commit is contained in:
committed by
GitHub
parent
440ff11b14
commit
f8a29da29d
7
changelog/23013.txt
Normal file
7
changelog/23013.txt
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
```release-note:bug
|
||||||
|
storage/consul: fix a bug where an active node in a specific sort of network
|
||||||
|
partition could continue to write data to Consul after a new leader is elected
|
||||||
|
potentially causing data loss or corruption for keys with many concurrent
|
||||||
|
writers. For Enterprise clusters this could cause corruption of the merkle trees
|
||||||
|
leading to failure to complete merkle sync without a full re-index.
|
||||||
|
```
|
||||||
70
helper/testhelpers/consul/cluster_storage.go
Normal file
70
helper/testhelpers/consul/cluster_storage.go
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/vault/sdk/helper/testcluster"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ClusterStorage struct {
|
||||||
|
// Set these after calling `NewConsulClusterStorage` but before `Start` (or
|
||||||
|
// passing in to NewDockerCluster) to control Consul version specifically in
|
||||||
|
// your test. Leave empty for latest OSS (defined in consulhelper.go).
|
||||||
|
ConsulVersion string
|
||||||
|
ConsulEnterprise bool
|
||||||
|
|
||||||
|
cleanup func()
|
||||||
|
config *Config
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ testcluster.ClusterStorage = &ClusterStorage{}
|
||||||
|
|
||||||
|
func NewClusterStorage() *ClusterStorage {
|
||||||
|
return &ClusterStorage{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ClusterStorage) Start(ctx context.Context, opts *testcluster.ClusterOptions) error {
|
||||||
|
prefix := ""
|
||||||
|
if opts != nil && opts.ClusterName != "" {
|
||||||
|
prefix = fmt.Sprintf("%s-", opts.ClusterName)
|
||||||
|
}
|
||||||
|
cleanup, config, err := RunContainer(ctx, prefix, s.ConsulVersion, s.ConsulEnterprise, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.cleanup = cleanup
|
||||||
|
s.config = config
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ClusterStorage) Cleanup() error {
|
||||||
|
if s.cleanup != nil {
|
||||||
|
s.cleanup()
|
||||||
|
s.cleanup = nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ClusterStorage) Opts() map[string]interface{} {
|
||||||
|
if s.config == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return map[string]interface{}{
|
||||||
|
"address": s.config.ContainerHTTPAddr,
|
||||||
|
"token": s.config.Token,
|
||||||
|
"max_parallel": "32",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ClusterStorage) Type() string {
|
||||||
|
return "consul"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ClusterStorage) Config() *Config {
|
||||||
|
return s.config
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ package consul
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -14,9 +15,16 @@ import (
|
|||||||
"github.com/hashicorp/vault/sdk/helper/docker"
|
"github.com/hashicorp/vault/sdk/helper/docker"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LatestConsulVersion is the most recent version of Consul which is used unless
|
||||||
|
// another version is specified in the test config or environment. This will
|
||||||
|
// probably go stale as we don't always update it on every release but we rarely
|
||||||
|
// rely on specific new Consul functionality so that's probably not a problem.
|
||||||
|
const LatestConsulVersion = "1.15.3"
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
docker.ServiceHostPort
|
docker.ServiceHostPort
|
||||||
Token string
|
Token string
|
||||||
|
ContainerHTTPAddr string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) APIConfig() *consulapi.Config {
|
func (c *Config) APIConfig() *consulapi.Config {
|
||||||
@@ -26,19 +34,39 @@ func (c *Config) APIConfig() *consulapi.Config {
|
|||||||
return apiConfig
|
return apiConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrepareTestContainer creates a Consul docker container. If version is empty,
|
// PrepareTestContainer is a test helper that creates a Consul docker container
|
||||||
// the Consul version used will be given by the environment variable
|
// or fails the test if unsuccessful. See RunContainer for more details on the
|
||||||
// CONSUL_DOCKER_VERSION, or if that's empty, whatever we've hardcoded as the
|
// configuration.
|
||||||
// the latest Consul version.
|
|
||||||
func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config) {
|
func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
cleanup, config, err := RunContainer(context.Background(), "", version, isEnterprise, doBootstrapSetup)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed starting consul: %s", err)
|
||||||
|
}
|
||||||
|
return cleanup, config
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunContainer runs Consul in a Docker container unless CONSUL_HTTP_ADDR is
|
||||||
|
// already found in the environment. Consul version is determined by the version
|
||||||
|
// argument. If version is empty string, the CONSUL_DOCKER_VERSION environment
|
||||||
|
// variable is used and if that is empty too, LatestConsulVersion is used
|
||||||
|
// (defined above). If namePrefix is provided we assume you have chosen a unique
|
||||||
|
// enough prefix to avoid collision with other tests that may be running in
|
||||||
|
// parallel and so _do not_ add an additional unique ID suffix. We will also
|
||||||
|
// ensure previous instances are deleted and leave the container running for
|
||||||
|
// debugging. This is useful for using Consul as part of at testcluster (i.e.
|
||||||
|
// when Vault is in Docker too). If namePrefix is empty then a unique suffix is
|
||||||
|
// added since many older tests rely on a uniq instance of the container. This
|
||||||
|
// is used by `PrepareTestContainer` which is used typically in tests that rely
|
||||||
|
// on Consul but run tested code within the test process.
|
||||||
|
func RunContainer(ctx context.Context, namePrefix, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config, error) {
|
||||||
if retAddress := os.Getenv("CONSUL_HTTP_ADDR"); retAddress != "" {
|
if retAddress := os.Getenv("CONSUL_HTTP_ADDR"); retAddress != "" {
|
||||||
shp, err := docker.NewServiceHostPortParse(retAddress)
|
shp, err := docker.NewServiceHostPortParse(retAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return func() {}, &Config{ServiceHostPort: *shp, Token: os.Getenv("CONSUL_HTTP_TOKEN")}
|
return func() {}, &Config{ServiceHostPort: *shp, Token: os.Getenv("CONSUL_HTTP_TOKEN")}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
config := `acl { enabled = true default_policy = "deny" }`
|
config := `acl { enabled = true default_policy = "deny" }`
|
||||||
@@ -47,7 +75,7 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
|
|||||||
if consulVersion != "" {
|
if consulVersion != "" {
|
||||||
version = consulVersion
|
version = consulVersion
|
||||||
} else {
|
} else {
|
||||||
version = "1.11.3" // Latest Consul version, update as new releases come out
|
version = LatestConsulVersion
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if strings.HasPrefix(version, "1.3") {
|
if strings.HasPrefix(version, "1.3") {
|
||||||
@@ -66,15 +94,18 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
|
|||||||
envVars = append(envVars, "CONSUL_LICENSE="+license)
|
envVars = append(envVars, "CONSUL_LICENSE="+license)
|
||||||
|
|
||||||
if !hasLicense {
|
if !hasLicense {
|
||||||
t.Fatalf("Failed to find enterprise license")
|
return nil, nil, fmt.Errorf("Failed to find enterprise license")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if namePrefix != "" {
|
||||||
|
name = namePrefix + name
|
||||||
|
}
|
||||||
|
|
||||||
if dockerRepo, hasEnvRepo := os.LookupEnv("CONSUL_DOCKER_REPO"); hasEnvRepo {
|
if dockerRepo, hasEnvRepo := os.LookupEnv("CONSUL_DOCKER_REPO"); hasEnvRepo {
|
||||||
repo = dockerRepo
|
repo = dockerRepo
|
||||||
}
|
}
|
||||||
|
|
||||||
runner, err := docker.NewServiceRunner(docker.RunOptions{
|
dockerOpts := docker.RunOptions{
|
||||||
ContainerName: name,
|
ContainerName: name,
|
||||||
ImageRepo: repo,
|
ImageRepo: repo,
|
||||||
ImageTag: version,
|
ImageTag: version,
|
||||||
@@ -83,12 +114,25 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
|
|||||||
Ports: []string{"8500/tcp"},
|
Ports: []string{"8500/tcp"},
|
||||||
AuthUsername: os.Getenv("CONSUL_DOCKER_USERNAME"),
|
AuthUsername: os.Getenv("CONSUL_DOCKER_USERNAME"),
|
||||||
AuthPassword: os.Getenv("CONSUL_DOCKER_PASSWORD"),
|
AuthPassword: os.Getenv("CONSUL_DOCKER_PASSWORD"),
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Could not start docker Consul: %s", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
svc, err := runner.StartService(context.Background(), func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) {
|
// Add a unique suffix if there is no per-test prefix provided
|
||||||
|
addSuffix := true
|
||||||
|
if namePrefix != "" {
|
||||||
|
// Don't add a suffix if the caller already provided a prefix
|
||||||
|
addSuffix = false
|
||||||
|
// Also enable predelete and non-removal to make debugging easier for test
|
||||||
|
// cases with named containers).
|
||||||
|
dockerOpts.PreDelete = true
|
||||||
|
dockerOpts.DoNotAutoRemove = true
|
||||||
|
}
|
||||||
|
|
||||||
|
runner, err := docker.NewServiceRunner(dockerOpts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("Could not start docker Consul: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, _, err := runner.StartNewService(ctx, addSuffix, false, func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) {
|
||||||
shp := docker.NewServiceHostPort(host, port)
|
shp := docker.NewServiceHostPort(host, port)
|
||||||
apiConfig := consulapi.DefaultNonPooledConfig()
|
apiConfig := consulapi.DefaultNonPooledConfig()
|
||||||
apiConfig.Address = shp.Address()
|
apiConfig.Address = shp.Address()
|
||||||
@@ -165,7 +209,7 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configure a namespace and parition if testing enterprise Consul
|
// Configure a namespace and partition if testing enterprise Consul
|
||||||
if isEnterprise {
|
if isEnterprise {
|
||||||
// Namespaces require Consul 1.7 or newer
|
// Namespaces require Consul 1.7 or newer
|
||||||
namespaceVersion, _ := goversion.NewVersion("1.7")
|
namespaceVersion, _ := goversion.NewVersion("1.7")
|
||||||
@@ -229,8 +273,20 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
|
|||||||
}, nil
|
}, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Could not start docker Consul: %s", err)
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return svc.Cleanup, svc.Config.(*Config)
|
// Find the container network info.
|
||||||
|
if len(svc.Container.NetworkSettings.Networks) < 1 {
|
||||||
|
svc.Cleanup()
|
||||||
|
return nil, nil, fmt.Errorf("failed to find any network settings for container")
|
||||||
|
}
|
||||||
|
cfg := svc.Config.(*Config)
|
||||||
|
for _, eps := range svc.Container.NetworkSettings.Networks {
|
||||||
|
// Just pick the first network, we assume only one for now.
|
||||||
|
// Pull out the real container IP and set that up
|
||||||
|
cfg.ContainerHTTPAddr = fmt.Sprintf("http://%s:8500", eps.IPAddress)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return svc.Cleanup, cfg, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,11 +4,11 @@
|
|||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
realtesting "testing"
|
realtesting "testing"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/vault/helper/testhelpers/consul"
|
"github.com/hashicorp/vault/helper/testhelpers/consul"
|
||||||
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
|
|
||||||
physConsul "github.com/hashicorp/vault/physical/consul"
|
physConsul "github.com/hashicorp/vault/physical/consul"
|
||||||
"github.com/hashicorp/vault/vault"
|
"github.com/hashicorp/vault/vault"
|
||||||
"github.com/mitchellh/go-testing-interface"
|
"github.com/mitchellh/go-testing-interface"
|
||||||
@@ -33,5 +33,93 @@ func MakeConsulBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendB
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ConsulBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
func ConsulBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
||||||
opts.PhysicalFactory = teststorage.SharedPhysicalFactory(MakeConsulBackend)
|
m := &consulContainerManager{}
|
||||||
|
opts.PhysicalFactory = m.Backend
|
||||||
|
}
|
||||||
|
|
||||||
|
// consulContainerManager exposes Backend which matches the PhysicalFactory func
|
||||||
|
// type. When called, it will ensure that a separate Consul container is started
|
||||||
|
// for each distinct vault cluster that calls it and ensures that each Vault
|
||||||
|
// core gets a separate Consul backend instance since that contains state
|
||||||
|
// related to lock sessions. The whole test framework doesn't have a concept of
|
||||||
|
// "cluster names" outside of the prefix attached to the logger and other
|
||||||
|
// backend factories, mostly via SharedPhysicalFactory currently implicitly rely
|
||||||
|
// on being called in a sequence of core 0, 1, 2,... on one cluster and then
|
||||||
|
// core 0, 1, 2... on the next and so on. Refactoring lots of things to make
|
||||||
|
// first-class cluster identifiers a thing seems like a heavy lift given that we
|
||||||
|
// already rely on sequence of calls everywhere else anyway so we do the same
|
||||||
|
// here - each time the Backend method is called with coreIdx == 0 we create a
|
||||||
|
// whole new Consul and assume subsequent non 0 index cores are in the same
|
||||||
|
// cluster.
|
||||||
|
type consulContainerManager struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
current *consulContainerBackendFactory
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *consulContainerManager) Backend(t testing.T, coreIdx int,
|
||||||
|
logger hclog.Logger, conf map[string]interface{},
|
||||||
|
) *vault.PhysicalBackendBundle {
|
||||||
|
m.mu.Lock()
|
||||||
|
if coreIdx == 0 || m.current == nil {
|
||||||
|
// Create a new consul container factory
|
||||||
|
m.current = &consulContainerBackendFactory{}
|
||||||
|
}
|
||||||
|
f := m.current
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
return f.Backend(t, coreIdx, logger, conf)
|
||||||
|
}
|
||||||
|
|
||||||
|
type consulContainerBackendFactory struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
refCount int
|
||||||
|
cleanupFn func()
|
||||||
|
config map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *consulContainerBackendFactory) Backend(t testing.T, coreIdx int,
|
||||||
|
logger hclog.Logger, conf map[string]interface{},
|
||||||
|
) *vault.PhysicalBackendBundle {
|
||||||
|
f.mu.Lock()
|
||||||
|
defer f.mu.Unlock()
|
||||||
|
|
||||||
|
if f.refCount == 0 {
|
||||||
|
f.startContainerLocked(t)
|
||||||
|
logger.Debug("started consul container", "clusterID", conf["cluster_id"],
|
||||||
|
"address", f.config["address"])
|
||||||
|
}
|
||||||
|
|
||||||
|
f.refCount++
|
||||||
|
consulBackend, err := physConsul.NewConsulBackend(f.config, logger.Named("consul"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return &vault.PhysicalBackendBundle{
|
||||||
|
Backend: consulBackend,
|
||||||
|
Cleanup: f.cleanup,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *consulContainerBackendFactory) startContainerLocked(t testing.T) {
|
||||||
|
cleanup, config := consul.PrepareTestContainer(t.(*realtesting.T), "", false, true)
|
||||||
|
f.config = map[string]string{
|
||||||
|
"address": config.Address(),
|
||||||
|
"token": config.Token,
|
||||||
|
"max_parallel": "32",
|
||||||
|
}
|
||||||
|
f.cleanupFn = cleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *consulContainerBackendFactory) cleanup() {
|
||||||
|
f.mu.Lock()
|
||||||
|
defer f.mu.Unlock()
|
||||||
|
|
||||||
|
if f.refCount < 1 || f.cleanupFn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.refCount--
|
||||||
|
if f.refCount == 0 {
|
||||||
|
f.cleanupFn()
|
||||||
|
f.cleanupFn = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -41,7 +42,7 @@ const (
|
|||||||
// Verify ConsulBackend satisfies the correct interfaces
|
// Verify ConsulBackend satisfies the correct interfaces
|
||||||
var (
|
var (
|
||||||
_ physical.Backend = (*ConsulBackend)(nil)
|
_ physical.Backend = (*ConsulBackend)(nil)
|
||||||
_ physical.HABackend = (*ConsulBackend)(nil)
|
_ physical.FencingHABackend = (*ConsulBackend)(nil)
|
||||||
_ physical.Lock = (*ConsulLock)(nil)
|
_ physical.Lock = (*ConsulLock)(nil)
|
||||||
_ physical.Transactional = (*ConsulBackend)(nil)
|
_ physical.Transactional = (*ConsulBackend)(nil)
|
||||||
|
|
||||||
@@ -53,6 +54,7 @@ var (
|
|||||||
// it allows Vault to run on multiple machines in a highly-available manner.
|
// it allows Vault to run on multiple machines in a highly-available manner.
|
||||||
// failGetInTxn is only used in tests.
|
// failGetInTxn is only used in tests.
|
||||||
type ConsulBackend struct {
|
type ConsulBackend struct {
|
||||||
|
logger log.Logger
|
||||||
client *api.Client
|
client *api.Client
|
||||||
path string
|
path string
|
||||||
kv *api.KV
|
kv *api.KV
|
||||||
@@ -62,6 +64,7 @@ type ConsulBackend struct {
|
|||||||
sessionTTL string
|
sessionTTL string
|
||||||
lockWaitTime time.Duration
|
lockWaitTime time.Duration
|
||||||
failGetInTxn *uint32
|
failGetInTxn *uint32
|
||||||
|
activeNodeLock atomic.Pointer[ConsulLock]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConsulBackend constructs a Consul backend using the given API client
|
// NewConsulBackend constructs a Consul backend using the given API client
|
||||||
@@ -152,6 +155,7 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe
|
|||||||
|
|
||||||
// Set up the backend
|
// Set up the backend
|
||||||
c := &ConsulBackend{
|
c := &ConsulBackend{
|
||||||
|
logger: logger,
|
||||||
path: path,
|
path: path,
|
||||||
client: client,
|
client: client,
|
||||||
kv: client.KV(),
|
kv: client.KV(),
|
||||||
@@ -262,12 +266,53 @@ func (c *ConsulBackend) ExpandedCapabilitiesAvailable(ctx context.Context) bool
|
|||||||
return available
|
return available
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConsulBackend) writeTxnOps(ctx context.Context, len int) ([]*api.TxnOp, string) {
|
||||||
|
if len < 1 {
|
||||||
|
len = 1
|
||||||
|
}
|
||||||
|
ops := make([]*api.TxnOp, 0, len+1)
|
||||||
|
|
||||||
|
// If we don't have a lock yet, return a transaction with no session check. We
|
||||||
|
// need to do this to allow writes during cluster initialization before there
|
||||||
|
// is an active node.
|
||||||
|
lock := c.activeNodeLock.Load()
|
||||||
|
if lock == nil {
|
||||||
|
return ops, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
lockKey, lockSession := lock.Info()
|
||||||
|
if lockKey == "" || lockSession == "" {
|
||||||
|
return ops, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the context used to write has been marked as a special case write that
|
||||||
|
// happens outside of a lock then don't add the session check.
|
||||||
|
if physical.IsUnfencedWrite(ctx) {
|
||||||
|
return ops, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the session check operation at index 0. This will allow us later to
|
||||||
|
// work out easily if a write failure is because of the session check.
|
||||||
|
ops = append(ops, &api.TxnOp{
|
||||||
|
KV: &api.KVTxnOp{
|
||||||
|
Verb: api.KVCheckSession,
|
||||||
|
Key: lockKey,
|
||||||
|
Session: lockSession,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return ops, lockSession
|
||||||
|
}
|
||||||
|
|
||||||
// Transaction is used to run multiple entries via a transaction.
|
// Transaction is used to run multiple entries via a transaction.
|
||||||
func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
|
func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
|
||||||
|
return c.txnInternal(ctx, txns, "transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConsulBackend) txnInternal(ctx context.Context, txns []*physical.TxnEntry, apiOpName string) error {
|
||||||
if len(txns) == 0 {
|
if len(txns) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
defer metrics.MeasureSince([]string{"consul", "transaction"}, time.Now())
|
defer metrics.MeasureSince([]string{"consul", apiOpName}, time.Now())
|
||||||
|
|
||||||
failGetInTxn := atomic.LoadUint32(c.failGetInTxn)
|
failGetInTxn := atomic.LoadUint32(c.failGetInTxn)
|
||||||
for _, t := range txns {
|
for _, t := range txns {
|
||||||
@@ -276,7 +321,7 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ops := make([]*api.TxnOp, 0, len(txns))
|
ops, sessionID := c.writeTxnOps(ctx, len(txns))
|
||||||
for _, t := range txns {
|
for _, t := range txns {
|
||||||
o, err := c.makeApiTxn(t)
|
o, err := c.makeApiTxn(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -302,14 +347,15 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok && len(resp.Errors) == 0 {
|
if ok && len(resp.Errors) == 0 {
|
||||||
// Loop over results and cache them in a map. Note that we're only caching the first time we see a key,
|
// Loop over results and cache them in a map. Note that we're only caching
|
||||||
// which _should_ correspond to a Get operation, since we expect those come first in our txns slice.
|
// the first time we see a key, which _should_ correspond to a Get
|
||||||
|
// operation, since we expect those come first in our txns slice (though
|
||||||
|
// after check-session).
|
||||||
for _, txnr := range resp.Results {
|
for _, txnr := range resp.Results {
|
||||||
if len(txnr.KV.Value) > 0 {
|
if len(txnr.KV.Value) > 0 {
|
||||||
// We need to trim the Consul kv path (typically "vault/") from the key otherwise it won't
|
// We need to trim the Consul kv path (typically "vault/") from the key
|
||||||
// match the transaction entries we have.
|
// otherwise it won't match the transaction entries we have.
|
||||||
key := strings.TrimPrefix(txnr.KV.Key, c.path)
|
key := strings.TrimPrefix(txnr.KV.Key, c.path)
|
||||||
if _, found := kvMap[key]; !found {
|
if _, found := kvMap[key]; !found {
|
||||||
kvMap[key] = txnr.KV.Value
|
kvMap[key] = txnr.KV.Value
|
||||||
@@ -321,6 +367,31 @@ func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEnt
|
|||||||
if len(resp.Errors) > 0 {
|
if len(resp.Errors) > 0 {
|
||||||
for _, res := range resp.Errors {
|
for _, res := range resp.Errors {
|
||||||
retErr = multierror.Append(retErr, errors.New(res.What))
|
retErr = multierror.Append(retErr, errors.New(res.What))
|
||||||
|
if res.OpIndex == 0 && sessionID != "" {
|
||||||
|
// We added a session check (sessionID not empty) so an error at OpIndex
|
||||||
|
// 0 means that we failed that session check. We don't attempt to string
|
||||||
|
// match because Consul can return at least three different errors here
|
||||||
|
// with no common string. In all cases though failing this check means
|
||||||
|
// we no longer hold the lock because it was released, modified or
|
||||||
|
// deleted. Rather than just continuing to try writing until the
|
||||||
|
// blocking query manages to notice we're no longer the lock holder
|
||||||
|
// (which can take 10s of seconds even in good network conditions in my
|
||||||
|
// testing) we can now Unlock directly here. Our ConsulLock now has a
|
||||||
|
// shortcut that will cause the lock to close the leaderCh immediately
|
||||||
|
// when we call without waiting for the blocking query to return (unlike
|
||||||
|
// Consul's current Lock implementation). But before we unlock, we
|
||||||
|
// should re-load the lock and ensure it's still the same instance we
|
||||||
|
// just tried to write with in case this goroutine is somehow really
|
||||||
|
// delayed and we actually acquired a whole new lock in the meantime!
|
||||||
|
lock := c.activeNodeLock.Load()
|
||||||
|
if lock != nil {
|
||||||
|
_, lockSessionID := lock.Info()
|
||||||
|
if sessionID == lockSessionID {
|
||||||
|
c.logger.Warn("session check failed on write, we lost active node lock, stepping down", "err", res.What)
|
||||||
|
lock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -361,27 +432,13 @@ func (c *ConsulBackend) makeApiTxn(txn *physical.TxnEntry) (*api.TxnOp, error) {
|
|||||||
|
|
||||||
// Put is used to insert or update an entry
|
// Put is used to insert or update an entry
|
||||||
func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
func (c *ConsulBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||||
defer metrics.MeasureSince([]string{"consul", "put"}, time.Now())
|
txns := []*physical.TxnEntry{
|
||||||
|
{
|
||||||
c.permitPool.Acquire()
|
Operation: physical.PutOperation,
|
||||||
defer c.permitPool.Release()
|
Entry: entry,
|
||||||
|
},
|
||||||
pair := &api.KVPair{
|
|
||||||
Key: c.path + entry.Key,
|
|
||||||
Value: entry.Value,
|
|
||||||
}
|
}
|
||||||
|
return c.txnInternal(ctx, txns, "put")
|
||||||
writeOpts := &api.WriteOptions{}
|
|
||||||
writeOpts = writeOpts.WithContext(ctx)
|
|
||||||
|
|
||||||
_, err := c.kv.Put(pair, writeOpts)
|
|
||||||
if err != nil {
|
|
||||||
if strings.Contains(err.Error(), "Value exceeds") {
|
|
||||||
return fmt.Errorf("%s: %w", physical.ErrValueTooLarge, err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get is used to fetch an entry
|
// Get is used to fetch an entry
|
||||||
@@ -414,16 +471,15 @@ func (c *ConsulBackend) Get(ctx context.Context, key string) (*physical.Entry, e
|
|||||||
|
|
||||||
// Delete is used to permanently delete an entry
|
// Delete is used to permanently delete an entry
|
||||||
func (c *ConsulBackend) Delete(ctx context.Context, key string) error {
|
func (c *ConsulBackend) Delete(ctx context.Context, key string) error {
|
||||||
defer metrics.MeasureSince([]string{"consul", "delete"}, time.Now())
|
txns := []*physical.TxnEntry{
|
||||||
|
{
|
||||||
c.permitPool.Acquire()
|
Operation: physical.DeleteOperation,
|
||||||
defer c.permitPool.Release()
|
Entry: &physical.Entry{
|
||||||
|
Key: key,
|
||||||
writeOpts := &api.WriteOptions{}
|
},
|
||||||
writeOpts = writeOpts.WithContext(ctx)
|
},
|
||||||
|
}
|
||||||
_, err := c.kv.Delete(c.path+key, writeOpts)
|
return c.txnInternal(ctx, txns, "delete")
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// List is used to list all the keys under a given
|
// List is used to list all the keys under a given
|
||||||
@@ -463,24 +519,14 @@ func (c *ConsulBackend) FailGetInTxn(fail bool) {
|
|||||||
|
|
||||||
// LockWith is used for mutual exclusion based on the given key.
|
// LockWith is used for mutual exclusion based on the given key.
|
||||||
func (c *ConsulBackend) LockWith(key, value string) (physical.Lock, error) {
|
func (c *ConsulBackend) LockWith(key, value string) (physical.Lock, error) {
|
||||||
// Create the lock
|
|
||||||
opts := &api.LockOptions{
|
|
||||||
Key: c.path + key,
|
|
||||||
Value: []byte(value),
|
|
||||||
SessionName: "Vault Lock",
|
|
||||||
MonitorRetries: 5,
|
|
||||||
SessionTTL: c.sessionTTL,
|
|
||||||
LockWaitTime: c.lockWaitTime,
|
|
||||||
}
|
|
||||||
lock, err := c.client.LockOpts(opts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create lock: %w", err)
|
|
||||||
}
|
|
||||||
cl := &ConsulLock{
|
cl := &ConsulLock{
|
||||||
|
logger: c.logger,
|
||||||
client: c.client,
|
client: c.client,
|
||||||
key: c.path + key,
|
key: c.path + key,
|
||||||
lock: lock,
|
value: value,
|
||||||
consistencyMode: c.consistencyMode,
|
consistencyMode: c.consistencyMode,
|
||||||
|
sessionTTL: c.sessionTTL,
|
||||||
|
lockWaitTime: c.lockWaitTime,
|
||||||
}
|
}
|
||||||
return cl, nil
|
return cl, nil
|
||||||
}
|
}
|
||||||
@@ -505,20 +551,203 @@ func (c *ConsulBackend) DetectHostAddr() (string, error) {
|
|||||||
return addr, nil
|
return addr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConsulLock is used to provide the Lock interface backed by Consul
|
// RegisterActiveNodeLock is called after active node lock is obtained to allow
|
||||||
|
// us to fence future writes.
|
||||||
|
func (c *ConsulBackend) RegisterActiveNodeLock(l physical.Lock) error {
|
||||||
|
cl, ok := l.(*ConsulLock)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid Lock type")
|
||||||
|
}
|
||||||
|
c.activeNodeLock.Store(cl)
|
||||||
|
key, sessionID := cl.Info()
|
||||||
|
c.logger.Info("registered active node lock", "key", key, "sessionID", sessionID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConsulLock is used to provide the Lock interface backed by Consul. We work
|
||||||
|
// around some limitations of Consuls api.Lock noted in
|
||||||
|
// https://github.com/hashicorp/consul/issues/18271 by creating and managing the
|
||||||
|
// session ourselves, while using Consul's Lock to do the heavy lifting.
|
||||||
type ConsulLock struct {
|
type ConsulLock struct {
|
||||||
|
logger log.Logger
|
||||||
client *api.Client
|
client *api.Client
|
||||||
key string
|
key string
|
||||||
lock *api.Lock
|
value string
|
||||||
consistencyMode string
|
consistencyMode string
|
||||||
|
sessionTTL string
|
||||||
|
lockWaitTime time.Duration
|
||||||
|
|
||||||
|
mu sync.Mutex // protects session state
|
||||||
|
session *lockSession
|
||||||
|
// sessionID is a copy of the value from session.id. We use a separate field
|
||||||
|
// because `Info` needs to keep returning the same sessionID after Unlock has
|
||||||
|
// cleaned up the session state so that we continue to fence any writes still
|
||||||
|
// in flight after the lock is Unlocked. It's easier to reason about that as a
|
||||||
|
// separate field rather than keeping an already-terminated session object
|
||||||
|
// around. Once Lock is called again this will be replaced (while mu is
|
||||||
|
// locked) with the new session ID. Must hold mu to read or write this.
|
||||||
|
sessionID string
|
||||||
|
}
|
||||||
|
|
||||||
|
type lockSession struct {
|
||||||
|
// id is immutable after the session is created so does not need mu held
|
||||||
|
id string
|
||||||
|
|
||||||
|
// mu protects the lock and unlockCh to ensure they are only cleaned up once
|
||||||
|
mu sync.Mutex
|
||||||
|
lock *api.Lock
|
||||||
|
unlockCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *lockSession) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
lockHeld := false
|
||||||
|
defer func() {
|
||||||
|
if !lockHeld {
|
||||||
|
s.cleanupLocked()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
consulLeaderCh, err := s.lock.Lock(stopCh)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if consulLeaderCh == nil {
|
||||||
|
// If both leaderCh and err are nil from Consul's Lock then it means we
|
||||||
|
// waited for the lockWait without grabbing it.
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
// We got the Lock, monitor it!
|
||||||
|
lockHeld = true
|
||||||
|
leaderCh := make(chan struct{})
|
||||||
|
go s.monitorLock(leaderCh, s.unlockCh, consulLeaderCh)
|
||||||
|
return leaderCh, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// monitorLock waits for either unlockCh or consulLeaderCh to close and then
|
||||||
|
// closes leaderCh. It's designed to be run in a separate goroutine. Note that
|
||||||
|
// we pass unlockCh rather than accessing it via the member variable because it
|
||||||
|
// is mutated under the lock during Unlock so reading it from c could be racy.
|
||||||
|
// We just need the chan created at the call site here so we pass it instead of
|
||||||
|
// locking and unlocking in here.
|
||||||
|
func (s *lockSession) monitorLock(leaderCh chan struct{}, unlockCh, consulLeaderCh <-chan struct{}) {
|
||||||
|
select {
|
||||||
|
case <-unlockCh:
|
||||||
|
case <-consulLeaderCh:
|
||||||
|
}
|
||||||
|
// We lost the lock. Close the leaderCh
|
||||||
|
close(leaderCh)
|
||||||
|
|
||||||
|
// Whichever chan closed, cleanup to unwind all the state. If we were
|
||||||
|
// triggered by a cleanup call this will be a no-op, but if not it ensures all
|
||||||
|
// state is cleaned up correctly.
|
||||||
|
s.cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *lockSession) cleanup() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.cleanupLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *lockSession) cleanupLocked() {
|
||||||
|
if s.lock != nil {
|
||||||
|
s.lock.Unlock()
|
||||||
|
s.lock = nil
|
||||||
|
}
|
||||||
|
if s.unlockCh != nil {
|
||||||
|
close(s.unlockCh)
|
||||||
|
s.unlockCh = nil
|
||||||
|
}
|
||||||
|
// Don't bother destroying sessions as they will be destroyed after TTL
|
||||||
|
// anyway.
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConsulLock) createSession() (*lockSession, error) {
|
||||||
|
se := &api.SessionEntry{
|
||||||
|
Name: "Vault Lock",
|
||||||
|
TTL: c.sessionTTL,
|
||||||
|
// We use Consul's default LockDelay of 15s by not specifying it
|
||||||
|
}
|
||||||
|
session, _, err := c.client.Session().Create(se, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := &api.LockOptions{
|
||||||
|
Key: c.key,
|
||||||
|
Value: []byte(c.value),
|
||||||
|
Session: session,
|
||||||
|
MonitorRetries: 5,
|
||||||
|
LockWaitTime: c.lockWaitTime,
|
||||||
|
SessionTTL: c.sessionTTL,
|
||||||
|
}
|
||||||
|
lock, err := c.client.LockOpts(opts)
|
||||||
|
if err != nil {
|
||||||
|
// Don't bother destroying sessions as they will be destroyed after TTL
|
||||||
|
// anyway.
|
||||||
|
return nil, fmt.Errorf("failed to create lock: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
unlockCh := make(chan struct{})
|
||||||
|
|
||||||
|
s := &lockSession{
|
||||||
|
id: session,
|
||||||
|
lock: lock,
|
||||||
|
unlockCh: unlockCh,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start renewals of the session
|
||||||
|
go func() {
|
||||||
|
// Note we capture unlockCh here rather than s.unlockCh because s.unlockCh
|
||||||
|
// is mutated on cleanup which is racy since we don't hold a lock here.
|
||||||
|
// unlockCh will never be mutated though.
|
||||||
|
err := c.client.Session().RenewPeriodic(c.sessionTTL, session, nil, unlockCh)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error("failed to renew consul session for more than the TTL, lock lost", "err", err)
|
||||||
|
}
|
||||||
|
// release other resources for this session only i.e. don't c.Unlock as that
|
||||||
|
// might now be locked under a different session).
|
||||||
|
s.cleanup()
|
||||||
|
}()
|
||||||
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
||||||
return c.lock.Lock(stopCh)
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.session != nil {
|
||||||
|
return nil, fmt.Errorf("lock instance already locked")
|
||||||
|
}
|
||||||
|
|
||||||
|
session, err := c.createSession()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
leaderCh, err := session.Lock(stopCh)
|
||||||
|
if leaderCh != nil && err == nil {
|
||||||
|
// We hold the lock, store the session
|
||||||
|
c.session = session
|
||||||
|
c.sessionID = session.id
|
||||||
|
}
|
||||||
|
return leaderCh, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConsulLock) Unlock() error {
|
func (c *ConsulLock) Unlock() error {
|
||||||
return c.lock.Unlock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.session != nil {
|
||||||
|
c.session.cleanup()
|
||||||
|
c.session = nil
|
||||||
|
// Don't clear c.sessionID since we rely on returning the same old ID after
|
||||||
|
// Unlock until the next Lock.
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConsulLock) Value() (bool, string, error) {
|
func (c *ConsulLock) Value() (bool, string, error) {
|
||||||
@@ -538,7 +767,18 @@ func (c *ConsulLock) Value() (bool, string, error) {
|
|||||||
if pair == nil {
|
if pair == nil {
|
||||||
return false, "", nil
|
return false, "", nil
|
||||||
}
|
}
|
||||||
|
// Note that held is expected to mean "does _any_ node hold the lock" not
|
||||||
|
// "does this current instance hold the lock" so although we know what our own
|
||||||
|
// session ID is, we don't check it matches here only that there is _some_
|
||||||
|
// session in Consul holding the lock right now.
|
||||||
held := pair.Session != ""
|
held := pair.Session != ""
|
||||||
value := string(pair.Value)
|
value := string(pair.Value)
|
||||||
return held, value, nil
|
return held, value, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ConsulLock) Info() (key, sessionid string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
return c.key, c.sessionID
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
"github.com/hashicorp/vault/helper/testhelpers/consul"
|
"github.com/hashicorp/vault/helper/testhelpers/consul"
|
||||||
"github.com/hashicorp/vault/sdk/helper/logging"
|
"github.com/hashicorp/vault/sdk/helper/logging"
|
||||||
"github.com/hashicorp/vault/sdk/physical"
|
"github.com/hashicorp/vault/sdk/physical"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestConsul_newConsulBackend(t *testing.T) {
|
func TestConsul_newConsulBackend(t *testing.T) {
|
||||||
@@ -442,7 +443,9 @@ func TestConsulHABackend(t *testing.T) {
|
|||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
randPath := fmt.Sprintf("vault-%d/", time.Now().Unix())
|
// We used to use a timestamp here but then if you run multiple instances in
|
||||||
|
// parallel with one Consul they end up conflicting.
|
||||||
|
randPath := fmt.Sprintf("vault-%d/", rand.Int())
|
||||||
defer func() {
|
defer func() {
|
||||||
client.KV().DeleteTree(randPath, nil)
|
client.KV().DeleteTree(randPath, nil)
|
||||||
}()
|
}()
|
||||||
@@ -453,6 +456,10 @@ func TestConsulHABackend(t *testing.T) {
|
|||||||
"token": config.Token,
|
"token": config.Token,
|
||||||
"path": randPath,
|
"path": randPath,
|
||||||
"max_parallel": "-1",
|
"max_parallel": "-1",
|
||||||
|
// We have to wait this out as part of the test so shorten it a little from
|
||||||
|
// the default 15 seconds helps with test run times, especially when running
|
||||||
|
// this in a loop to detect flakes!
|
||||||
|
"lock_wait_time": "3s",
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := NewConsulBackend(backendConfig, logger)
|
b, err := NewConsulBackend(backendConfig, logger)
|
||||||
@@ -478,4 +485,44 @@ func TestConsulHABackend(t *testing.T) {
|
|||||||
if host == "" {
|
if host == "" {
|
||||||
t.Fatalf("bad addr: %v", host)
|
t.Fatalf("bad addr: %v", host)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Calling `Info` on a Lock that has been unlocked must still return the old
|
||||||
|
// sessionID (until it is locked again) otherwise we will fail to fence writes
|
||||||
|
// that are still in flight from before (e.g. queued WAL or Merkle flushes) as
|
||||||
|
// soon as the first one unlocks the session allowing corruption again.
|
||||||
|
l, err := b.(physical.HABackend).LockWith("test-lock-session-info", "bar")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
expectKey := randPath + "test-lock-session-info"
|
||||||
|
|
||||||
|
cl := l.(*ConsulLock)
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
time.AfterFunc(5*time.Second, func() {
|
||||||
|
close(stopCh)
|
||||||
|
})
|
||||||
|
leaderCh, err := cl.Lock(stopCh)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, leaderCh)
|
||||||
|
|
||||||
|
key, sid := cl.Info()
|
||||||
|
require.Equal(t, expectKey, key)
|
||||||
|
require.NotEmpty(t, sid)
|
||||||
|
|
||||||
|
// Now Unlock the lock, sessionID should be reset to empty string
|
||||||
|
err = cl.Unlock()
|
||||||
|
require.NoError(t, err)
|
||||||
|
key2, sid2 := cl.Info()
|
||||||
|
require.Equal(t, key, key2)
|
||||||
|
require.Equal(t, sid, sid2)
|
||||||
|
|
||||||
|
// Lock it again, this should cause a new session to be created so SID should
|
||||||
|
// change.
|
||||||
|
leaderCh, err = cl.Lock(stopCh)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, leaderCh)
|
||||||
|
|
||||||
|
key3, sid3 := cl.Info()
|
||||||
|
require.Equal(t, key, key3)
|
||||||
|
require.NotEqual(t, sid, sid3)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,7 +55,6 @@ const MaxClusterNameLength = 52
|
|||||||
type DockerCluster struct {
|
type DockerCluster struct {
|
||||||
ClusterName string
|
ClusterName string
|
||||||
|
|
||||||
RaftStorage bool
|
|
||||||
ClusterNodes []*DockerClusterNode
|
ClusterNodes []*DockerClusterNode
|
||||||
|
|
||||||
// Certificate fields
|
// Certificate fields
|
||||||
@@ -73,6 +72,8 @@ type DockerCluster struct {
|
|||||||
ID string
|
ID string
|
||||||
Logger log.Logger
|
Logger log.Logger
|
||||||
builtTags map[string]struct{}
|
builtTags map[string]struct{}
|
||||||
|
|
||||||
|
storage testcluster.ClusterStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DockerCluster) NamedLogger(s string) log.Logger {
|
func (dc *DockerCluster) NamedLogger(s string) log.Logger {
|
||||||
@@ -407,9 +408,6 @@ func NewTestDockerCluster(t *testing.T, opts *DockerClusterOptions) *DockerClust
|
|||||||
if opts.NetworkName == "" {
|
if opts.NetworkName == "" {
|
||||||
opts.NetworkName = os.Getenv("TEST_DOCKER_NETWORK_NAME")
|
opts.NetworkName = os.Getenv("TEST_DOCKER_NETWORK_NAME")
|
||||||
}
|
}
|
||||||
if opts.VaultLicense == "" {
|
|
||||||
opts.VaultLicense = os.Getenv(testcluster.EnvVaultLicenseCI)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
@@ -434,14 +432,17 @@ func NewDockerCluster(ctx context.Context, opts *DockerClusterOptions) (*DockerC
|
|||||||
if opts.Logger == nil {
|
if opts.Logger == nil {
|
||||||
opts.Logger = log.NewNullLogger()
|
opts.Logger = log.NewNullLogger()
|
||||||
}
|
}
|
||||||
|
if opts.VaultLicense == "" {
|
||||||
|
opts.VaultLicense = os.Getenv(testcluster.EnvVaultLicenseCI)
|
||||||
|
}
|
||||||
|
|
||||||
dc := &DockerCluster{
|
dc := &DockerCluster{
|
||||||
DockerAPI: api,
|
DockerAPI: api,
|
||||||
RaftStorage: true,
|
|
||||||
ClusterName: opts.ClusterName,
|
ClusterName: opts.ClusterName,
|
||||||
Logger: opts.Logger,
|
Logger: opts.Logger,
|
||||||
builtTags: map[string]struct{}{},
|
builtTags: map[string]struct{}{},
|
||||||
CA: opts.CA,
|
CA: opts.CA,
|
||||||
|
storage: opts.Storage,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := dc.setupDockerCluster(ctx, opts); err != nil {
|
if err := dc.setupDockerCluster(ctx, opts); err != nil {
|
||||||
@@ -588,21 +589,31 @@ func (n *DockerClusterNode) Start(ctx context.Context, opts *DockerClusterOption
|
|||||||
vaultCfg["telemetry"] = map[string]interface{}{
|
vaultCfg["telemetry"] = map[string]interface{}{
|
||||||
"disable_hostname": true,
|
"disable_hostname": true,
|
||||||
}
|
}
|
||||||
raftOpts := map[string]interface{}{
|
|
||||||
|
// Setup storage. Default is raft.
|
||||||
|
storageType := "raft"
|
||||||
|
storageOpts := map[string]interface{}{
|
||||||
// TODO add options from vnc
|
// TODO add options from vnc
|
||||||
"path": "/vault/file",
|
"path": "/vault/file",
|
||||||
"node_id": n.NodeID,
|
"node_id": n.NodeID,
|
||||||
}
|
}
|
||||||
vaultCfg["storage"] = map[string]interface{}{
|
|
||||||
"raft": raftOpts,
|
if opts.Storage != nil {
|
||||||
|
storageType = opts.Storage.Type()
|
||||||
|
storageOpts = opts.Storage.Opts()
|
||||||
}
|
}
|
||||||
if opts != nil && opts.VaultNodeConfig != nil && len(opts.VaultNodeConfig.StorageOptions) > 0 {
|
|
||||||
|
if opts != nil && opts.VaultNodeConfig != nil {
|
||||||
for k, v := range opts.VaultNodeConfig.StorageOptions {
|
for k, v := range opts.VaultNodeConfig.StorageOptions {
|
||||||
if _, ok := raftOpts[k].(string); !ok {
|
if _, ok := storageOpts[k].(string); !ok {
|
||||||
raftOpts[k] = v
|
storageOpts[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
vaultCfg["storage"] = map[string]interface{}{
|
||||||
|
storageType: storageOpts,
|
||||||
|
}
|
||||||
|
|
||||||
//// disable_mlock is required for working in the Docker environment with
|
//// disable_mlock is required for working in the Docker environment with
|
||||||
//// custom plugins
|
//// custom plugins
|
||||||
vaultCfg["disable_mlock"] = true
|
vaultCfg["disable_mlock"] = true
|
||||||
@@ -817,6 +828,72 @@ func (n *DockerClusterNode) AddNetworkDelay(ctx context.Context, delay time.Dura
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PartitionFromCluster will cause the node to be disconnected at the network
|
||||||
|
// level from the rest of the docker cluster. It does so in a way that the node
|
||||||
|
// will not see TCP RSTs and all packets it sends will be "black holed". It
|
||||||
|
// attempts to keep packets to and from the host intact which allows docker
|
||||||
|
// daemon to continue streaming logs and any test code to continue making
|
||||||
|
// requests from the host to the partitioned node.
|
||||||
|
func (n *DockerClusterNode) PartitionFromCluster(ctx context.Context) error {
|
||||||
|
stdout, stderr, exitCode, err := n.runner.RunCmdWithOutput(ctx, n.Container.ID, []string{
|
||||||
|
"/bin/sh",
|
||||||
|
"-xec", strings.Join([]string{
|
||||||
|
fmt.Sprintf("echo partitioning container from network"),
|
||||||
|
"apk add iproute2",
|
||||||
|
// Get the gateway address for the bridge so we can allow host to
|
||||||
|
// container traffic still.
|
||||||
|
"GW=$(ip r | grep default | grep eth0 | cut -f 3 -d' ')",
|
||||||
|
// First delete the rules in case this is called twice otherwise we'll add
|
||||||
|
// multiple copies and only remove one in Unpartition (yay iptables).
|
||||||
|
// Ignore the error if it didn't exist.
|
||||||
|
"iptables -D INPUT -i eth0 ! -s \"$GW\" -j DROP | true",
|
||||||
|
"iptables -D OUTPUT -o eth0 ! -d \"$GW\" -j DROP | true",
|
||||||
|
// Add rules to drop all packets in and out of the docker network
|
||||||
|
// connection.
|
||||||
|
"iptables -I INPUT -i eth0 ! -s \"$GW\" -j DROP",
|
||||||
|
"iptables -I OUTPUT -o eth0 ! -d \"$GW\" -j DROP",
|
||||||
|
}, "; "),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
n.Logger.Trace(string(stdout))
|
||||||
|
n.Logger.Trace(string(stderr))
|
||||||
|
if exitCode != 0 {
|
||||||
|
return fmt.Errorf("got nonzero exit code from iptables: %d", exitCode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnpartitionFromCluster reverses a previous call to PartitionFromCluster and
|
||||||
|
// restores full connectivity. Currently assumes the default "bridge" network.
|
||||||
|
func (n *DockerClusterNode) UnpartitionFromCluster(ctx context.Context) error {
|
||||||
|
stdout, stderr, exitCode, err := n.runner.RunCmdWithOutput(ctx, n.Container.ID, []string{
|
||||||
|
"/bin/sh",
|
||||||
|
"-xec", strings.Join([]string{
|
||||||
|
fmt.Sprintf("echo un-partitioning container from network"),
|
||||||
|
// Get the gateway address for the bridge so we can allow host to
|
||||||
|
// container traffic still.
|
||||||
|
"GW=$(ip r | grep default | grep eth0 | cut -f 3 -d' ')",
|
||||||
|
// Remove the rules, ignore if they are not present or iptables wasn't
|
||||||
|
// installed yet (i.e. no-one called PartitionFromCluster yet).
|
||||||
|
"iptables -D INPUT -i eth0 ! -s \"$GW\" -j DROP | true",
|
||||||
|
"iptables -D OUTPUT -o eth0 ! -d \"$GW\" -j DROP | true",
|
||||||
|
}, "; "),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
n.Logger.Trace(string(stdout))
|
||||||
|
n.Logger.Trace(string(stderr))
|
||||||
|
if exitCode != 0 {
|
||||||
|
return fmt.Errorf("got nonzero exit code from iptables: %d", exitCode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type LogConsumerWriter struct {
|
type LogConsumerWriter struct {
|
||||||
consumer func(string)
|
consumer func(string)
|
||||||
}
|
}
|
||||||
@@ -844,6 +921,7 @@ type DockerClusterOptions struct {
|
|||||||
VaultBinary string
|
VaultBinary string
|
||||||
Args []string
|
Args []string
|
||||||
StartProbe func(*api.Client) error
|
StartProbe func(*api.Client) error
|
||||||
|
Storage testcluster.ClusterStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
func ensureLeaderMatches(ctx context.Context, client *api.Client, ready func(response *api.LeaderResponse) error) error {
|
func ensureLeaderMatches(ctx context.Context, client *api.Client, ready func(response *api.LeaderResponse) error) error {
|
||||||
@@ -904,6 +982,12 @@ func (dc *DockerCluster) setupDockerCluster(ctx context.Context, opts *DockerClu
|
|||||||
dc.RootCAs = x509.NewCertPool()
|
dc.RootCAs = x509.NewCertPool()
|
||||||
dc.RootCAs.AddCert(dc.CA.CACert)
|
dc.RootCAs.AddCert(dc.CA.CACert)
|
||||||
|
|
||||||
|
if dc.storage != nil {
|
||||||
|
if err := dc.storage.Start(ctx, &opts.ClusterOptions); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i := 0; i < numCores; i++ {
|
for i := 0; i < numCores; i++ {
|
||||||
if err := dc.addNode(ctx, opts); err != nil {
|
if err := dc.addNode(ctx, opts); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -964,6 +1048,11 @@ func (dc *DockerCluster) addNode(ctx context.Context, opts *DockerClusterOptions
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DockerCluster) joinNode(ctx context.Context, nodeIdx int, leaderIdx int) error {
|
func (dc *DockerCluster) joinNode(ctx context.Context, nodeIdx int, leaderIdx int) error {
|
||||||
|
if dc.storage != nil && dc.storage.Type() != "raft" {
|
||||||
|
// Storage is not raft so nothing to do but unseal.
|
||||||
|
return testcluster.UnsealNode(ctx, dc, nodeIdx)
|
||||||
|
}
|
||||||
|
|
||||||
leader := dc.ClusterNodes[leaderIdx]
|
leader := dc.ClusterNodes[leaderIdx]
|
||||||
|
|
||||||
if nodeIdx >= len(dc.ClusterNodes) {
|
if nodeIdx >= len(dc.ClusterNodes) {
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
package testcluster
|
package testcluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
@@ -110,3 +111,10 @@ type CA struct {
|
|||||||
CAKey *ecdsa.PrivateKey
|
CAKey *ecdsa.PrivateKey
|
||||||
CAKeyPEM []byte
|
CAKeyPEM []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ClusterStorage interface {
|
||||||
|
Start(context.Context, *ClusterOptions) error
|
||||||
|
Cleanup() error
|
||||||
|
Opts() map[string]interface{}
|
||||||
|
Type() string
|
||||||
|
}
|
||||||
|
|||||||
@@ -158,6 +158,20 @@ func NodeHealthy(ctx context.Context, cluster VaultCluster, nodeIdx int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func LeaderNode(ctx context.Context, cluster VaultCluster) (int, error) {
|
func LeaderNode(ctx context.Context, cluster VaultCluster) (int, error) {
|
||||||
|
// Be robust to multiple nodes thinking they are active. This is possible in
|
||||||
|
// certain network partition situations where the old leader has not
|
||||||
|
// discovered it's lost leadership yet. In tests this is only likely to come
|
||||||
|
// up when we are specifically provoking it, but it's possible it could happen
|
||||||
|
// at any point if leadership flaps of connectivity suffers transient errors
|
||||||
|
// etc. so be robust against it. The best solution would be to have some sort
|
||||||
|
// of epoch like the raft term that is guaranteed to be monotonically
|
||||||
|
// increasing through elections, however we don't have that abstraction for
|
||||||
|
// all HABackends in general. The best we have is the ActiveTime. In a
|
||||||
|
// distributed systems text book this would be bad to rely on due to clock
|
||||||
|
// sync issues etc. but for our tests it's likely fine because even if we are
|
||||||
|
// running separate Vault containers, they are all using the same hardware
|
||||||
|
// clock in the system.
|
||||||
|
leaderActiveTimes := make(map[int]time.Time)
|
||||||
for i, node := range cluster.Nodes() {
|
for i, node := range cluster.Nodes() {
|
||||||
client := node.APIClient()
|
client := node.APIClient()
|
||||||
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
||||||
@@ -166,10 +180,24 @@ func LeaderNode(ctx context.Context, cluster VaultCluster) (int, error) {
|
|||||||
if err != nil || resp == nil || !resp.IsSelf {
|
if err != nil || resp == nil || !resp.IsSelf {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return i, nil
|
leaderActiveTimes[i] = resp.ActiveTime
|
||||||
}
|
}
|
||||||
|
if len(leaderActiveTimes) == 0 {
|
||||||
return -1, fmt.Errorf("no leader found")
|
return -1, fmt.Errorf("no leader found")
|
||||||
}
|
}
|
||||||
|
// At least one node thinks it is active. If multiple, pick the one with the
|
||||||
|
// most recent ActiveTime. Note if there is only one then this just returns
|
||||||
|
// it.
|
||||||
|
var newestLeaderIdx int
|
||||||
|
var newestActiveTime time.Time
|
||||||
|
for i, at := range leaderActiveTimes {
|
||||||
|
if at.After(newestActiveTime) {
|
||||||
|
newestActiveTime = at
|
||||||
|
newestLeaderIdx = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newestLeaderIdx, nil
|
||||||
|
}
|
||||||
|
|
||||||
func WaitForActiveNode(ctx context.Context, cluster VaultCluster) (int, error) {
|
func WaitForActiveNode(ctx context.Context, cluster VaultCluster) (int, error) {
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
@@ -189,7 +217,8 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster)
|
|||||||
// A sleep before calling WaitForActiveNodeAndPerfStandbys seems to sort
|
// A sleep before calling WaitForActiveNodeAndPerfStandbys seems to sort
|
||||||
// things out, but so apparently does this. We should be able to eliminate
|
// things out, but so apparently does this. We should be able to eliminate
|
||||||
// this call to WaitForActiveNode by reworking the logic in this method.
|
// this call to WaitForActiveNode by reworking the logic in this method.
|
||||||
if _, err := WaitForActiveNode(ctx, cluster); err != nil {
|
leaderIdx, err := WaitForActiveNode(ctx, cluster)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -203,7 +232,7 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
leaderClient := cluster.Nodes()[0].APIClient()
|
leaderClient := cluster.Nodes()[leaderIdx].APIClient()
|
||||||
|
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
err = leaderClient.Sys().MountWithContext(ctx, mountPoint, &api.MountInput{
|
err = leaderClient.Sys().MountWithContext(ctx, mountPoint, &api.MountInput{
|
||||||
@@ -244,6 +273,7 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster)
|
|||||||
var leader *api.LeaderResponse
|
var leader *api.LeaderResponse
|
||||||
leader, err = client.Sys().LeaderWithContext(ctx)
|
leader, err = client.Sys().LeaderWithContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Trace("waiting for core", "core", coreNo, "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
switch {
|
switch {
|
||||||
@@ -261,6 +291,12 @@ func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster)
|
|||||||
atomic.AddInt64(&standbys, 1)
|
atomic.AddInt64(&standbys, 1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
logger.Trace("waiting for core", "core", coreNo,
|
||||||
|
"ha_enabled", leader.HAEnabled,
|
||||||
|
"is_self", leader.IsSelf,
|
||||||
|
"perf_standby", leader.PerfStandby,
|
||||||
|
"perf_standby_remote_wal", leader.PerfStandbyLastRemoteWAL)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
|
|||||||
@@ -60,6 +60,69 @@ type HABackend interface {
|
|||||||
HAEnabled() bool
|
HAEnabled() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FencingHABackend is an HABackend which provides the additional guarantee that
|
||||||
|
// each Lock it returns from LockWith is also a FencingLock. A FencingLock
|
||||||
|
// provides a mechanism to retrieve a fencing token that can be included by
|
||||||
|
// future writes by the backend to ensure that it is still the current lock
|
||||||
|
// holder at the time the write commits. Without this timing might allow a lock
|
||||||
|
// holder not to notice it's no longer the active node for long enough for it to
|
||||||
|
// write data to storage even while a new active node is writing causing
|
||||||
|
// corruption. For Consul backend the fencing token is the session id which is
|
||||||
|
// submitted with `check-session` operation on each write to ensure the write
|
||||||
|
// only completes if the session is still holding the lock. For raft backend
|
||||||
|
// this isn't needed because our in-process raft library is unable to write if
|
||||||
|
// it's not the leader anyway.
|
||||||
|
//
|
||||||
|
// If you implement this, Vault will call RegisterActiveNodeLock with the Lock
|
||||||
|
// instance returned by LockWith after it successfully locks it. This keeps the
|
||||||
|
// backend oblivious to the specific key we use for active node locks and allows
|
||||||
|
// potential future usage of locks for other purposes in the future.
|
||||||
|
//
|
||||||
|
// Note that all implementations must support writing to storage before
|
||||||
|
// RegisterActiveNodeLock is called to support initialization of a new cluster.
|
||||||
|
// They must also skip fencing writes if the write's Context contains a special
|
||||||
|
// value. This is necessary to allow Vault to clear and re-initialise secondary
|
||||||
|
// clusters even though there is already an active node with a specific lock
|
||||||
|
// session since we clear the cluster while Vault is sealed and clearing the
|
||||||
|
// data might remove the lock in some storages (e.g. Consul). As noted above
|
||||||
|
// it's not generally safe to allow unfenced writes after a lock so instead we
|
||||||
|
// special case just a few types of writes that only happen rarely while the
|
||||||
|
// cluster is sealed. See the IsUnfencedWrite helper function.
|
||||||
|
type FencingHABackend interface {
|
||||||
|
HABackend
|
||||||
|
|
||||||
|
RegisterActiveNodeLock(l Lock) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// unfencedWriteContextKeyType is a special type to identify context values to
|
||||||
|
// disable fencing. It's a separate type per the best-practice in Context.Value
|
||||||
|
// docs to avoid collisions even if the key might match.
|
||||||
|
type unfencedWriteContextKeyType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
// unfencedWriteContextKey is the context key we pass the option to bypass
|
||||||
|
// fencing through to a FencingHABackend. Note that this is not an ideal use
|
||||||
|
// of context values and violates the "do not use it for optional arguments"
|
||||||
|
// guidance but has been agreed as a pragmatic option for this case rather
|
||||||
|
// than needing to specialize every physical.Backend to understand this
|
||||||
|
// option.
|
||||||
|
unfencedWriteContextKey unfencedWriteContextKeyType = "vault-disable-fencing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// UnfencedWriteCtx adds metadata to a ctx such that any writes performed
|
||||||
|
// directly on a FencingHABackend using that context will _not_ add a fencing
|
||||||
|
// token.
|
||||||
|
func UnfencedWriteCtx(ctx context.Context) context.Context {
|
||||||
|
return context.WithValue(ctx, unfencedWriteContextKey, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsUnfencedWrite returns whether or not the context passed has the unfenced
|
||||||
|
// flag value set.
|
||||||
|
func IsUnfencedWrite(ctx context.Context) bool {
|
||||||
|
isUnfenced, ok := ctx.Value(unfencedWriteContextKey).(bool)
|
||||||
|
return ok && isUnfenced
|
||||||
|
}
|
||||||
|
|
||||||
// ToggleablePurgemonster is an interface for backends that can toggle on or
|
// ToggleablePurgemonster is an interface for backends that can toggle on or
|
||||||
// off special functionality and/or support purging. This is only used for the
|
// off special functionality and/or support purging. This is only used for the
|
||||||
// cache, don't use it for other things.
|
// cache, don't use it for other things.
|
||||||
@@ -86,7 +149,7 @@ type Lock interface {
|
|||||||
// Unlock is used to release the lock
|
// Unlock is used to release the lock
|
||||||
Unlock() error
|
Unlock() error
|
||||||
|
|
||||||
// Returns the value of the lock and if it is held
|
// Returns the value of the lock and if it is held by _any_ node
|
||||||
Value() (bool, string, error)
|
Value() (bool, string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ExerciseBackend(t testing.TB, b Backend) {
|
func ExerciseBackend(t testing.TB, b Backend) {
|
||||||
@@ -330,12 +332,25 @@ func ExerciseHABackend(t testing.TB, b HABackend, b2 HABackend) {
|
|||||||
t.Errorf("expected value bar: %v", err)
|
t.Errorf("expected value bar: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if it's fencing that we can register the lock
|
||||||
|
if fba, ok := b.(FencingHABackend); ok {
|
||||||
|
require.NoError(t, fba.RegisterActiveNodeLock(lock))
|
||||||
|
}
|
||||||
|
|
||||||
// Second acquisition should fail
|
// Second acquisition should fail
|
||||||
lock2, err := b2.LockWith("foo", "baz")
|
lock2, err := b2.LockWith("foo", "baz")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("lock 2: %v", err)
|
t.Fatalf("lock 2: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checking the lock from b2 should discover that the lock is held since held
|
||||||
|
// implies only that there is _some_ leader not that b2 is leader (this was
|
||||||
|
// not clear before so we make it explicit with this assertion).
|
||||||
|
held2, val2, err := lock2.Value()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "bar", val2)
|
||||||
|
require.True(t, held2)
|
||||||
|
|
||||||
// Cancel attempt in 50 msec
|
// Cancel attempt in 50 msec
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
time.AfterFunc(50*time.Millisecond, func() {
|
time.AfterFunc(50*time.Millisecond, func() {
|
||||||
@@ -363,6 +378,11 @@ func ExerciseHABackend(t testing.TB, b HABackend, b2 HABackend) {
|
|||||||
t.Errorf("should get leaderCh")
|
t.Errorf("should get leaderCh")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if it's fencing that we can register the lock
|
||||||
|
if fba2, ok := b2.(FencingHABackend); ok {
|
||||||
|
require.NoError(t, fba2.RegisterActiveNodeLock(lock))
|
||||||
|
}
|
||||||
|
|
||||||
// Check the value
|
// Check the value
|
||||||
held, val, err = lock2.Value()
|
held, val, err = lock2.Value()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -0,0 +1,295 @@
|
|||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package consul_fencing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/vault/api"
|
||||||
|
"github.com/hashicorp/vault/helper/testhelpers/consul"
|
||||||
|
"github.com/hashicorp/vault/sdk/helper/testcluster"
|
||||||
|
"github.com/hashicorp/vault/sdk/helper/testcluster/docker"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestConsulFencing_PartitionedLeaderCantWrite attempts to create an active
|
||||||
|
// node split-brain when using Consul storage to ensure the old leader doesn't
|
||||||
|
// continue to write data potentially corrupting storage. It is naturally
|
||||||
|
// non-deterministic since it relies heavily on timing of the different
|
||||||
|
// container processes, however it pretty reliably failed before the fencing fix
|
||||||
|
// (and Consul lock improvements) and should _never_ fail now we correctly fence
|
||||||
|
// writes.
|
||||||
|
func TestConsulFencing_PartitionedLeaderCantWrite(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
consulStorage := consul.NewClusterStorage()
|
||||||
|
|
||||||
|
// Create cluster logger that will dump cluster logs to stdout for debugging.
|
||||||
|
logger := hclog.NewInterceptLogger(hclog.DefaultOptions)
|
||||||
|
logger.SetLevel(hclog.Trace)
|
||||||
|
|
||||||
|
clusterOpts := docker.DefaultOptions(t)
|
||||||
|
clusterOpts.ImageRepo = "hashicorp/vault-enterprise"
|
||||||
|
clusterOpts.ClusterOptions.Logger = logger
|
||||||
|
|
||||||
|
clusterOpts.Storage = consulStorage
|
||||||
|
|
||||||
|
logger.Info("==> starting cluster")
|
||||||
|
c, err := docker.NewDockerCluster(ctx, clusterOpts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
logger.Info(" ✅ done.", "root_token", c.GetRootToken(),
|
||||||
|
"consul_token", consulStorage.Config().Token)
|
||||||
|
|
||||||
|
logger.Info("==> waiting for leader")
|
||||||
|
leaderIdx, err := testcluster.WaitForActiveNode(ctx, c)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
leader := c.Nodes()[leaderIdx]
|
||||||
|
leaderClient := leader.APIClient()
|
||||||
|
|
||||||
|
notLeader := c.Nodes()[1] // Assumes it's usually zero and correct below
|
||||||
|
if leaderIdx == 1 {
|
||||||
|
notLeader = c.Nodes()[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mount a KV v2 backend
|
||||||
|
logger.Info("==> mounting KV")
|
||||||
|
err = leaderClient.Sys().Mount("/test", &api.MountInput{
|
||||||
|
Type: "kv-v2",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Start two background workers that will cause writes to Consul in the
|
||||||
|
// background. KV v2 relies on a single active node for correctness.
|
||||||
|
// Specifically its patch operation does a read-modify-write under a
|
||||||
|
// key-specific lock which is correct for concurrent writes to one process,
|
||||||
|
// but which by nature of our storage API is not going to be atomic if another
|
||||||
|
// active node is also writing the same KV. It's made worse because the cache
|
||||||
|
// backend means the active node will not actually read from Consul after the
|
||||||
|
// initial read and will be modifying its own in-memory version and writing
|
||||||
|
// that back. So we should be able to detect multiple active nodes writing
|
||||||
|
// reliably with the following setup:
|
||||||
|
// 1. Two separate "client" goroutines each connected to different Vault
|
||||||
|
// servers.
|
||||||
|
// 2. Both write to the same kv-v2 key, each one modifies only its own set
|
||||||
|
// of subkeys c1-X or c2-X.
|
||||||
|
// 3. Each request adds the next sequential X value for that client. We use a
|
||||||
|
// Patch operation so we don't need to read the version or use CAS. On an
|
||||||
|
// error each client will retry the same key until it gets a success.
|
||||||
|
// 4. In a correct system with a single active node despite a partition, we
|
||||||
|
// expect a complete set of consecutive X values for both clients (i.e.
|
||||||
|
// no writes lost). If an old leader is still allowed to write to Consul
|
||||||
|
// then it will continue to patch against its own last-known version from
|
||||||
|
// cache and so will overwrite any concurrent updates from the other
|
||||||
|
// client and we should see that as lost updates at the end.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
errCh := make(chan error, 10)
|
||||||
|
var writeCount uint64
|
||||||
|
|
||||||
|
// Initialise the key once
|
||||||
|
kv := leaderClient.KVv2("/test")
|
||||||
|
_, err = kv.Put(ctx, "data", map[string]interface{}{
|
||||||
|
"c0-00000000": 1, // value don't matter here only keys in this set.
|
||||||
|
"c1-00000000": 1,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
const interval = 500 * time.Millisecond
|
||||||
|
|
||||||
|
runWriter := func(i int, targetServer testcluster.VaultClusterNode, ctr *uint64) {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
|
kv := targetServer.APIClient().KVv2("/test")
|
||||||
|
|
||||||
|
for {
|
||||||
|
key := fmt.Sprintf("c%d-%08d", i, atomic.LoadUint64(ctr))
|
||||||
|
|
||||||
|
// Use a short timeout. If we don't then the one goroutine writing to the
|
||||||
|
// partitioned active node can get stuck here until the 60 second request
|
||||||
|
// timeout kicks in without issuing another request.
|
||||||
|
reqCtx, cancel := context.WithTimeout(ctx, interval)
|
||||||
|
logger.Debug("sending patch", "client", i, "key", key)
|
||||||
|
_, err = kv.Patch(reqCtx, "data", map[string]interface{}{
|
||||||
|
key: 1,
|
||||||
|
})
|
||||||
|
cancel()
|
||||||
|
// Deliver errors to test, don't block if we get too many before context
|
||||||
|
// is cancelled otherwise client 0 can end up blocked as it has so many
|
||||||
|
// errors during the partition it doesn't actually start writing again
|
||||||
|
// ever and so the test never sees split-brain writes.
|
||||||
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case errCh <- fmt.Errorf("client %d error: %w", i, err):
|
||||||
|
default:
|
||||||
|
// errCh is blocked, carry on anyway
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Only increment our set counter here now we've had an ack that the
|
||||||
|
// update was successful.
|
||||||
|
atomic.AddUint64(ctr, 1)
|
||||||
|
atomic.AddUint64(&writeCount, 1)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(interval):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("==> starting writers")
|
||||||
|
client0Ctr, client1Ctr := uint64(1), uint64(1)
|
||||||
|
go runWriter(0, leader, &client0Ctr)
|
||||||
|
go runWriter(1, notLeader, &client1Ctr)
|
||||||
|
|
||||||
|
// Wait for some writes to have started
|
||||||
|
var writesBeforePartition uint64
|
||||||
|
logger.Info("==> waiting for writes")
|
||||||
|
for {
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
writesBeforePartition = atomic.LoadUint64(&writeCount)
|
||||||
|
if writesBeforePartition >= 5 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Also check for any write errors
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.NoError(t, err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
require.NoError(t, ctx.Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err := kv.Get(ctx, "data")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
logger.Info("==> partitioning leader")
|
||||||
|
// Now partition the leader from everything else (including Consul)
|
||||||
|
err = leader.(*docker.DockerClusterNode).PartitionFromCluster(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Reload this incase more writes occurred before the partition completed.
|
||||||
|
writesBeforePartition = atomic.LoadUint64(&writeCount)
|
||||||
|
|
||||||
|
// Wait for some more writes to have happened (the client writing to leader
|
||||||
|
// will probably have sent one and hung waiting for a response but the other
|
||||||
|
// one should eventually start committing again when new active node is
|
||||||
|
// elected).
|
||||||
|
|
||||||
|
logger.Info("==> waiting for writes to new leader")
|
||||||
|
for {
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
writesAfterPartition := atomic.LoadUint64(&writeCount)
|
||||||
|
if (writesAfterPartition - writesBeforePartition) >= 20 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Also check for any write errors or timeouts
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
// Don't fail here because we expect writes to the old leader to fail
|
||||||
|
// eventually or if they need a new connection etc.
|
||||||
|
logger.Info("failed write", "write_count", writesAfterPartition, "err", err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
require.NoError(t, ctx.Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Heal partition
|
||||||
|
logger.Info("==> healing partition")
|
||||||
|
err = leader.(*docker.DockerClusterNode).UnpartitionFromCluster(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Wait for old leader to rejoin as a standby and get healthy.
|
||||||
|
logger.Info("==> wait for old leader to rejoin")
|
||||||
|
|
||||||
|
require.NoError(t, waitUntilNotLeader(ctx, leaderClient, logger))
|
||||||
|
|
||||||
|
// Stop the writers and wait for them to shut down nicely
|
||||||
|
logger.Info("==> stopping writers")
|
||||||
|
cancel()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Now verify that all Consul data is consistent with only one leader writing.
|
||||||
|
// Use a new context since we just cancelled the general one
|
||||||
|
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
val, err = kv.Get(reqCtx, "data")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Ensure we have every consecutive key for both client
|
||||||
|
sets := [][]int{make([]int, 0, 128), make([]int, 0, 128)}
|
||||||
|
for k := range val.Data {
|
||||||
|
var cNum, x int
|
||||||
|
_, err := fmt.Sscanf(k, "c%d-%08d", &cNum, &x)
|
||||||
|
require.NoError(t, err)
|
||||||
|
sets[cNum] = append(sets[cNum], x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort both sets
|
||||||
|
sort.Ints(sets[0])
|
||||||
|
sort.Ints(sets[1])
|
||||||
|
|
||||||
|
// Ensure they are both complete by creating an expected set and comparing to
|
||||||
|
// get nice output to debug. Note that make set is an exclusive bound since we
|
||||||
|
// don't know it the current counter value write completed or not yet so we'll
|
||||||
|
// only create sets up to one less than that value that we know for sure
|
||||||
|
// should be present.
|
||||||
|
c0Writes := int(atomic.LoadUint64(&client0Ctr))
|
||||||
|
c1Writes := int(atomic.LoadUint64(&client1Ctr))
|
||||||
|
expect0 := makeSet(c0Writes)
|
||||||
|
expect1 := makeSet(c1Writes)
|
||||||
|
|
||||||
|
// Trim the sets to only the writes we know completed since that's all the
|
||||||
|
// expected arrays contain. But only if they are longer so we don't change the
|
||||||
|
// slice length if they are shorter than the expected number.
|
||||||
|
if len(sets[0]) > c0Writes {
|
||||||
|
sets[0] = sets[0][0:c0Writes]
|
||||||
|
}
|
||||||
|
if len(sets[1]) > c1Writes {
|
||||||
|
sets[1] = sets[1][0:c1Writes]
|
||||||
|
}
|
||||||
|
require.Equal(t, expect0, sets[0], "Client 0 writes lost")
|
||||||
|
require.Equal(t, expect1, sets[1], "Client 1 writes lost")
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeSet(n int) []int {
|
||||||
|
a := make([]int, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
a[i] = i
|
||||||
|
}
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitUntilNotLeader(ctx context.Context, oldLeaderClient *api.Client, logger hclog.Logger) error {
|
||||||
|
for {
|
||||||
|
// Wait for the original leader to acknowledge it's not active any more.
|
||||||
|
resp, err := oldLeaderClient.Sys().LeaderWithContext(ctx)
|
||||||
|
// Tolerate errors as the old leader is in a difficult place right now.
|
||||||
|
if err == nil {
|
||||||
|
if !resp.IsSelf {
|
||||||
|
// We are not leader!
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
logger.Info("old leader not ready yet", "IsSelf", resp.IsSelf)
|
||||||
|
} else {
|
||||||
|
logger.Info("failed to read old leader status", "err", err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
// Loop again
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
27
vault/ha.go
27
vault/ha.go
@@ -520,6 +520,20 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the backend is a FencingHABackend, register the lock with it so it can
|
||||||
|
// correctly fence all writes from now on (i.e. assert that we still hold
|
||||||
|
// the lock atomically with each write).
|
||||||
|
if fba, ok := c.ha.(physical.FencingHABackend); ok {
|
||||||
|
err := fba.RegisterActiveNodeLock(lock)
|
||||||
|
if err != nil {
|
||||||
|
// Can't register lock, bail out
|
||||||
|
c.heldHALock = nil
|
||||||
|
lock.Unlock()
|
||||||
|
c.logger.Error("failed registering lock with fencing backend, giving up active state")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
c.logger.Info("acquired lock, enabling active operation")
|
c.logger.Info("acquired lock, enabling active operation")
|
||||||
|
|
||||||
// This is used later to log a metrics event; this can be helpful to
|
// This is used later to log a metrics event; this can be helpful to
|
||||||
@@ -825,7 +839,18 @@ func (c *Core) periodicLeaderRefresh(newLeaderCh chan func(), stopCh chan struct
|
|||||||
go func() {
|
go func() {
|
||||||
// Bind locally, as the race detector is tripping here
|
// Bind locally, as the race detector is tripping here
|
||||||
lopCount := opCount
|
lopCount := opCount
|
||||||
isLeader, _, newClusterAddr, _ := c.Leader()
|
isLeader, _, newClusterAddr, err := c.Leader()
|
||||||
|
if err != nil {
|
||||||
|
// This is debug level because it's not really something the user
|
||||||
|
// needs to see typically. This will only really fail if we are sealed
|
||||||
|
// or the HALock fails (e.g. can't connect to Consul or elect raft
|
||||||
|
// leader) and other things in logs should make those kinds of
|
||||||
|
// conditions obvious. However when debugging, it is useful to know
|
||||||
|
// for sure why a standby is not seeing the leadership update which
|
||||||
|
// could be due to errors being returned or could be due to some other
|
||||||
|
// bug.
|
||||||
|
c.logger.Debug("periodicLeaderRefresh fail to fetch leader info", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
// If we are the leader reset the clusterAddr since the next
|
// If we are the leader reset the clusterAddr since the next
|
||||||
// failover might go to the node that was previously active.
|
// failover might go to the node that was previously active.
|
||||||
|
|||||||
Reference in New Issue
Block a user