backport of commit db71fdb087 (#17444)

Co-authored-by: Josh Black <raskchanky@users.noreply.github.com>
This commit is contained in:
hc-github-team-secure-vault-core
2022-10-06 14:44:02 -04:00
committed by GitHub
parent 67a2ce5d50
commit 7fcce03e71
12 changed files with 248 additions and 50 deletions

View File

@@ -37,11 +37,13 @@ import (
bolt "go.etcd.io/bbolt"
)
// EnvVaultRaftNodeID is used to fetch the Raft node ID from the environment.
const EnvVaultRaftNodeID = "VAULT_RAFT_NODE_ID"
const (
// EnvVaultRaftNodeID is used to fetch the Raft node ID from the environment.
EnvVaultRaftNodeID = "VAULT_RAFT_NODE_ID"
// EnvVaultRaftPath is used to fetch the path where Raft data is stored from the environment.
const EnvVaultRaftPath = "VAULT_RAFT_PATH"
// EnvVaultRaftPath is used to fetch the path where Raft data is stored from the environment.
EnvVaultRaftPath = "VAULT_RAFT_PATH"
)
var getMmapFlags = func(string) int { return 0 }
@@ -169,6 +171,8 @@ type RaftBackend struct {
// redundancyZone specifies a redundancy zone for autopilot.
redundancyZone string
effectiveSDKVersion string
}
// LeaderJoinInfo contains information required by a node to join itself as a
@@ -537,6 +541,12 @@ func (b *RaftBackend) Close() error {
return nil
}
func (b *RaftBackend) SetEffectiveSDKVersion(sdkVersion string) {
b.l.Lock()
b.effectiveSDKVersion = sdkVersion
b.l.Unlock()
}
func (b *RaftBackend) RedundancyZone() string {
b.l.RLock()
defer b.l.RUnlock()

View File

@@ -16,7 +16,6 @@ import (
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/vault/sdk/version"
"github.com/mitchellh/mapstructure"
"go.uber.org/atomic"
)
@@ -287,12 +286,14 @@ type Delegate struct {
// dl is a lock dedicated for guarding delegate's fields
dl sync.RWMutex
inflightRemovals map[raft.ServerID]bool
emptyVersionLogs map[raft.ServerID]struct{}
}
func newDelegate(b *RaftBackend) *Delegate {
return &Delegate{
RaftBackend: b,
inflightRemovals: make(map[raft.ServerID]bool),
emptyVersionLogs: make(map[raft.ServerID]struct{}),
}
}
@@ -398,12 +399,29 @@ func (d *Delegate) KnownServers() map[raft.ServerID]*autopilot.Server {
continue
}
// If version isn't found in the state, fake it using the version from the leader so that autopilot
// doesn't demote the node to a non-voter, just because of a missed heartbeat.
currentServerID := raft.ServerID(id)
followerVersion := state.Version
leaderVersion := d.effectiveSDKVersion
d.dl.Lock()
if followerVersion == "" {
if _, ok := d.emptyVersionLogs[currentServerID]; !ok {
d.logger.Trace("received empty Vault version in heartbeat state. faking it with the leader version for now", "id", id, "leader version", leaderVersion)
d.emptyVersionLogs[currentServerID] = struct{}{}
}
followerVersion = leaderVersion
} else {
delete(d.emptyVersionLogs, currentServerID)
}
d.dl.Unlock()
server := &autopilot.Server{
ID: raft.ServerID(id),
ID: currentServerID,
Name: id,
RaftVersion: raft.ProtocolVersionMax,
Meta: d.meta(state),
Version: state.Version,
Version: followerVersion,
Ext: d.autopilotServerExt(state),
}
@@ -428,7 +446,7 @@ func (d *Delegate) KnownServers() map[raft.ServerID]*autopilot.Server {
UpgradeVersion: d.EffectiveVersion(),
RedundancyZone: d.RedundancyZone(),
}),
Version: version.GetVersion().Version,
Version: d.effectiveSDKVersion,
Ext: d.autopilotServerExt(nil),
IsLeader: true,
}

View File

@@ -96,6 +96,10 @@ const (
ForwardSSCTokenToActive = "new_token"
WrapperTypeHsmAutoDeprecated = wrapping.WrapperType("hsm-auto")
// undoLogsAreSafeStoragePath is a storage path that we write once we know undo logs are
// safe, so we don't have to keep checking all the time.
undoLogsAreSafeStoragePath = "core/raft/undo_logs_are_safe"
)
var (
@@ -629,6 +633,10 @@ type Core struct {
// only the active node will actually write the new version timestamp, a perf
// standby shouldn't rely on the stored version timestamps being present.
versionHistory map[string]VaultVersion
// effectiveSDKVersion contains the SDK version that standby nodes should use when
// heartbeating with the active node. Default to the current SDK version.
effectiveSDKVersion string
}
func (c *Core) HAState() consts.HAState {
@@ -759,6 +767,8 @@ type CoreConfig struct {
// DisableSSCTokens is used to disable the use of server side consistent tokens
DisableSSCTokens bool
EffectiveSDKVersion string
}
// GetServiceRegistration returns the config's ServiceRegistration, or nil if it does
@@ -840,6 +850,11 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
conf.NumExpirationWorkers = numExpirationWorkersDefault
}
effectiveSDKVersion := conf.EffectiveSDKVersion
if effectiveSDKVersion == "" {
effectiveSDKVersion = version.GetVersion().Version
}
// Setup the core
c := &Core{
entCore: entCore{},
@@ -905,6 +920,7 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
enableResponseHeaderRaftNodeID: conf.EnableResponseHeaderRaftNodeID,
mountMigrationTracker: &sync.Map{},
disableSSCTokens: conf.DisableSSCTokens,
effectiveSDKVersion: effectiveSDKVersion,
}
c.standbyStopCh.Store(make(chan struct{}))

View File

@@ -69,6 +69,12 @@ func (c *Core) metricsLoop(stopCh chan struct{}) {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 1, nil)
}
if c.UndoLogsEnabled() {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "write_undo_logs"}, 1, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "write_undo_logs"}, 0, nil)
}
// Refresh the standby gauge, on all nodes
if haState != consts.Active {
c.metricSink.SetGaugeWithLabels([]string{"core", "active"}, 0, nil)

View File

@@ -70,6 +70,10 @@ func (c *Core) barrierViewForNamespace(namespaceId string) (*BarrierView, error)
return c.systemBarrierView, nil
}
func (c *Core) UndoLogsEnabled() bool { return false }
func (c *Core) UndoLogsPersisted() (bool, error) { return false, nil }
func (c *Core) PersistUndoLogs() error { return nil }
func (c *Core) teardownReplicationResolverHandler() {}
func createSecondaries(*Core, *CoreConfig) {}

View File

@@ -422,7 +422,9 @@ func TestRaft_VotersStayVoters(t *testing.T) {
InmemCluster: true,
EnableAutopilot: true,
PhysicalFactoryConfig: map[string]interface{}{
"performance_multiplier": "5",
"performance_multiplier": "5",
"autopilot_reconcile_interval": "300ms",
"autopilot_update_interval": "100ms",
},
VersionMap: map[int]string{
0: version.Version,

View File

@@ -45,6 +45,7 @@ type RaftClusterOpts struct {
Seal vault.Seal
VersionMap map[int]string
RedundancyZoneMap map[int]string
EffectiveSDKVersionMap map[int]string
}
func raftCluster(t testing.TB, ropts *RaftClusterOpts) *vault.TestCluster {
@@ -70,6 +71,7 @@ func raftCluster(t testing.TB, ropts *RaftClusterOpts) *vault.TestCluster {
opts.NumCores = ropts.NumCores
opts.VersionMap = ropts.VersionMap
opts.RedundancyZoneMap = ropts.RedundancyZoneMap
opts.EffectiveSDKVersionMap = ropts.EffectiveSDKVersionMap
teststorage.RaftBackendSetup(conf, &opts)

View File

@@ -6,6 +6,8 @@ import (
"crypto/x509"
"errors"
"fmt"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
@@ -88,6 +90,49 @@ func (c *Core) StandbyStates() (standby, perfStandby bool) {
return
}
// getHAMembers retrieves cluster membership that doesn't depend on raft. This should only ever be called by the
// active node.
func (c *Core) getHAMembers() ([]HAStatusNode, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
leader := HAStatusNode{
Hostname: hostname,
APIAddress: c.redirectAddr,
ClusterAddress: c.ClusterAddr(),
ActiveNode: true,
Version: c.effectiveSDKVersion,
}
if rb := c.getRaftBackend(); rb != nil {
leader.UpgradeVersion = rb.EffectiveVersion()
leader.RedundancyZone = rb.RedundancyZone()
}
nodes := []HAStatusNode{leader}
for _, peerNode := range c.GetHAPeerNodesCached() {
lastEcho := peerNode.LastEcho
nodes = append(nodes, HAStatusNode{
Hostname: peerNode.Hostname,
APIAddress: peerNode.APIAddress,
ClusterAddress: peerNode.ClusterAddress,
LastEcho: &lastEcho,
Version: peerNode.Version,
UpgradeVersion: peerNode.UpgradeVersion,
RedundancyZone: peerNode.RedundancyZone,
})
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].APIAddress < nodes[j].APIAddress
})
return nodes, nil
}
// Leader is used to get the current active leader
func (c *Core) Leader() (isLeader bool, leaderAddr, clusterAddr string, err error) {
// Check if HA enabled. We don't need the lock for this check as it's set

View File

@@ -11,7 +11,6 @@ import (
"fmt"
"hash"
"net/http"
"os"
"path"
"path/filepath"
"sort"
@@ -4615,43 +4614,11 @@ func (b *SystemBackend) rotateBarrierKey(ctx context.Context) error {
func (b *SystemBackend) handleHAStatus(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
// We're always the leader if we're handling this request.
hostname, err := os.Hostname()
nodes, err := b.Core.getHAMembers()
if err != nil {
return nil, err
}
leader := HAStatusNode{
Hostname: hostname,
APIAddress: b.Core.redirectAddr,
ClusterAddress: b.Core.ClusterAddr(),
ActiveNode: true,
Version: version.GetVersion().Version,
}
if rb := b.Core.getRaftBackend(); rb != nil {
leader.UpgradeVersion = rb.EffectiveVersion()
leader.RedundancyZone = rb.RedundancyZone()
}
nodes := []HAStatusNode{leader}
for _, peerNode := range b.Core.GetHAPeerNodesCached() {
lastEcho := peerNode.LastEcho
nodes = append(nodes, HAStatusNode{
Hostname: peerNode.Hostname,
APIAddress: peerNode.APIAddress,
ClusterAddress: peerNode.ClusterAddress,
LastEcho: &lastEcho,
Version: peerNode.Version,
UpgradeVersion: peerNode.UpgradeVersion,
RedundancyZone: peerNode.RedundancyZone,
})
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].APIAddress < nodes[j].APIAddress
})
return &logical.Response{
Data: map[string]interface{}{
"nodes": nodes,

View File

@@ -20,16 +20,27 @@ import (
wrapping "github.com/hashicorp/go-kms-wrapping/v2"
"github.com/hashicorp/go-secure-stdlib/tlsutil"
"github.com/hashicorp/go-uuid"
goversion "github.com/hashicorp/go-version"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/version"
"github.com/hashicorp/vault/vault/seal"
"github.com/mitchellh/mapstructure"
"golang.org/x/net/http2"
)
const (
// undoLogMonitorInterval is how often the leader checks to see
// if all the cluster members it knows about are new enough to support
// undo logs.
undoLogMonitorInterval = time.Second
// undoLogSafeVersion is the minimum version Vault must be at in order
// for undo logs to be turned on.
undoLogSafeVersion = "1.12.0-rc1"
)
var (
raftTLSStoragePath = "core/raft/tls"
raftTLSRotationPeriod = 24 * time.Hour
@@ -171,6 +182,115 @@ func (c *Core) startRaftBackend(ctx context.Context) (retErr error) {
return nil
}
func (c *Core) monitorUndoLogs() error {
logger := c.logger.Named("undo-log-watcher")
logger.Debug("starting undo log watcher")
ctx := c.activeContext
raftBackend := c.getRaftBackend()
// First check storage and bail early if we already know undo logs are safe
persisted, err := c.UndoLogsPersisted()
if err != nil {
return fmt.Errorf("error checking for undo logs persistence: %w", err)
}
if persisted {
logger.Debug("undo logs are safe, no need to check any more")
return nil
}
minimumVersion, err := goversion.NewSemver(undoLogSafeVersion)
if err != nil {
return fmt.Errorf("minimum undo log version (%q) won't parse: %w", undoLogSafeVersion, err)
}
go func() {
ticker := time.NewTicker(undoLogMonitorInterval)
defer ticker.Stop()
logger.Debug("undo logs have not been enabled yet, possibly due to a recent upgrade. starting a periodic check")
for {
select {
case <-ticker.C:
case <-ctx.Done():
return
}
// Check the raft configuration for expected servers
config, err := raftBackend.GetConfiguration(ctx)
if err != nil {
logger.Error("couldn't read raft config", "error", err)
continue
}
// This tracks which servers we expect to find in the cluster, from the raft config.
expectedServers := make(map[string]struct{})
for _, server := range config.Servers {
expectedServers[server.Address] = struct{}{}
}
// Retrieve all the nodes in our cluster
nodes, err := c.getHAMembers()
if err != nil {
logger.Error("error getting HA members", "error", err)
continue
}
// Check the versions of all of the cluster members. If they're all >= 1.12, undo logs are safe to enable.
// If any are < 1.12, undo logs should remain off, regardless of how it was initially configured.
enable := true
for _, node := range nodes {
nodeVersion, err := goversion.NewSemver(node.Version)
if err != nil {
logger.Error("error parsing node version", "node version", node.Version, "node", node.ClusterAddress, "error", err)
break
}
if nodeVersion.LessThan(minimumVersion) {
logger.Debug("node version is less than the minimum, disabling undo logs", "node", node.ClusterAddress, "version", node.Version)
enable = false
break
} else {
// Raft nodes have their address listed without a scheme, e.g. 127.0.0.1:8201. HA nodes that we get from
// getHAMembers() have their address listed with a scheme, e.g. https://127.0.0.1:8201. So we need to
// parse the HA node address and reconstruct it ourselves to get a matching hash key.
clusterAddr, err := url.Parse(node.ClusterAddress)
if err != nil {
logger.Error("error parsing node cluster address", "node", node.ClusterAddress, "error", err)
break
}
// Deleting from expectedServers means the node in question is running a Vault version greater than
// or equal to the minimum.
delete(expectedServers, clusterAddr.Host)
}
}
// If expectedServers still has nodes in it, that means either we broke from the above loop because some
// node's version was too low or because some member of the cluster hasn't sent an echo yet. Either way,
// it means we can't enable undo logs.
if len(expectedServers) != 0 {
enable = false
}
if enable {
logger.Debug("undo logs can be safely enabled now")
err := c.PersistUndoLogs()
if err != nil {
logger.Error("error persisting undo logs safety", "error", err)
continue
}
logger.Debug("undo logs have been enabled and this has been persisted to storage. shutting down the checker loop.")
return
}
}
}()
return nil
}
func (c *Core) setupRaftActiveNode(ctx context.Context) error {
raftBackend := c.getRaftBackend()
if raftBackend == nil {
@@ -178,6 +298,7 @@ func (c *Core) setupRaftActiveNode(ctx context.Context) error {
}
c.logger.Info("starting raft active node")
raftBackend.SetEffectiveSDKVersion(c.effectiveSDKVersion)
autopilotConfig, err := c.loadAutopilotConfiguration(ctx)
if err != nil {
@@ -194,6 +315,9 @@ func (c *Core) setupRaftActiveNode(ctx context.Context) error {
return err
}
if err := c.monitorUndoLogs(); err != nil {
return err
}
return c.startPeriodicRaftTLSRotate(ctx)
}
@@ -356,7 +480,6 @@ func (c *Core) raftTLSRotatePhased(ctx context.Context, logger hclog.Logger, raf
AppliedIndex: 0,
Term: 0,
DesiredSuffrage: "voter",
SDKVersion: version.GetVersion().Version,
})
}
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/hashicorp/vault/helper/forwarding"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/version"
"github.com/hashicorp/vault/vault/replication"
)
@@ -140,7 +139,7 @@ func (c *forwardingClient) startHeartbeat() {
Message: "ping",
ClusterAddr: clusterAddr,
NodeInfo: &ni,
SdkVersion: version.GetVersion().Version,
SdkVersion: c.core.effectiveSDKVersion,
}
if raftBackend := c.core.getRaftBackend(); raftBackend != nil {

View File

@@ -1217,9 +1217,10 @@ type TestClusterOptions struct {
LicensePrivateKey ed25519.PrivateKey
// this stores the vault version that should be used for each core config
VersionMap map[int]string
RedundancyZoneMap map[int]string
KVVersion string
VersionMap map[int]string
RedundancyZoneMap map[int]string
KVVersion string
EffectiveSDKVersionMap map[int]string
}
var DefaultNumCores = 3
@@ -1907,6 +1908,11 @@ func (testCluster *TestCluster) newCore(t testing.T, idx int, coreConfig *CoreCo
if coreConfig.Logger == nil || (opts != nil && opts.Logger != nil) {
localConfig.Logger = testCluster.Logger.Named(fmt.Sprintf("core%d", idx))
}
if opts != nil && opts.EffectiveSDKVersionMap != nil {
localConfig.EffectiveSDKVersion = opts.EffectiveSDKVersionMap[idx]
}
if opts != nil && opts.PhysicalFactory != nil {
pfc := opts.PhysicalFactoryConfig
if pfc == nil {