Update locking components from DR replication changes (#3283)

* Update locking components from DR replication changes

* Fix plugin backend test

* Add a comment about needing the statelock:
This commit is contained in:
Brian Kassouf
2017-09-04 16:38:37 -07:00
committed by Jeff Mitchell
parent 00de6d0599
commit de7f39e064
8 changed files with 46 additions and 44 deletions

View File

@@ -3,18 +3,37 @@ package consts
type ReplicationState uint32 type ReplicationState uint32
const ( const (
ReplicationDisabled ReplicationState = iota _ ReplicationState = iota
ReplicationPrimary OldReplicationPrimary
ReplicationSecondary OldReplicationSecondary
OldReplicationBootstrapping
ReplicationDisabled ReplicationState = 0
ReplicationPerformancePrimary ReplicationState = 1 << iota
ReplicationPerformanceSecondary
ReplicationBootstrapping
ReplicationDRPrimary
ReplicationDRSecondary
) )
func (r ReplicationState) String() string { func (r ReplicationState) String() string {
switch r { switch r {
case ReplicationSecondary: case ReplicationPerformanceSecondary:
return "secondary" return "perf-secondary"
case ReplicationPrimary: case ReplicationPerformancePrimary:
return "primary" return "perf-primary"
case ReplicationBootstrapping:
return "bootstrapping"
case ReplicationDRPrimary:
return "dr-primary"
case ReplicationDRSecondary:
return "dr-secondary"
} }
return "disabled" return "disabled"
} }
func (r ReplicationState) HasState(flag ReplicationState) bool { return r&flag != 0 }
func (r *ReplicationState) AddState(flag ReplicationState) { *r |= flag }
func (r *ReplicationState) ClearState(flag ReplicationState) { *r &= ^flag }
func (r *ReplicationState) ToggleState(flag ReplicationState) { *r ^= flag }

View File

@@ -21,7 +21,7 @@ func handleSysRekeyInit(core *vault.Core, recovery bool) http.Handler {
} }
repState := core.ReplicationState() repState := core.ReplicationState()
if repState == consts.ReplicationSecondary { if repState.HasState(consts.ReplicationPerformanceSecondary) {
respondError(w, http.StatusBadRequest, respondError(w, http.StatusBadRequest,
fmt.Errorf("rekeying can only be performed on the primary cluster when replication is activated")) fmt.Errorf("rekeying can only be performed on the primary cluster when replication is activated"))
return return

View File

@@ -117,7 +117,7 @@ func TestSystem_replicationState(t *testing.T) {
defer client.Close() defer client.Close()
sys := logical.TestSystemView() sys := logical.TestSystemView()
sys.ReplicationStateVal = consts.ReplicationPrimary sys.ReplicationStateVal = consts.ReplicationPerformancePrimary
server.RegisterName("Plugin", &SystemViewServer{ server.RegisterName("Plugin", &SystemViewServer{
impl: sys, impl: sys,

View File

@@ -1891,11 +1891,9 @@ func (c *Core) emitMetrics(stopCh chan struct{}) {
} }
func (c *Core) ReplicationState() consts.ReplicationState { func (c *Core) ReplicationState() consts.ReplicationState {
var state consts.ReplicationState c.stateLock.RLock()
c.clusterParamsLock.RLock() defer c.stateLock.RUnlock()
state = c.replicationState return c.replicationState
c.clusterParamsLock.RUnlock()
return state
} }
func (c *Core) SealAccess() *SealAccess { func (c *Core) SealAccess() *SealAccess {

View File

@@ -84,13 +84,10 @@ func (d dynamicSystemView) CachingDisabled() bool {
return d.core.cachingDisabled || (d.mountEntry != nil && d.mountEntry.Config.ForceNoCache) return d.core.cachingDisabled || (d.mountEntry != nil && d.mountEntry.Config.ForceNoCache)
} }
// Checks if this is a primary Vault instance. // Checks if this is a primary Vault instance. Caller should hold the stateLock
// in read mode.
func (d dynamicSystemView) ReplicationState() consts.ReplicationState { func (d dynamicSystemView) ReplicationState() consts.ReplicationState {
var state consts.ReplicationState return d.core.replicationState
d.core.clusterParamsLock.RLock()
state = d.core.replicationState
d.core.clusterParamsLock.RUnlock()
return state
} }
// ResponseWrapData wraps the given data in a cubbyhole and returns the // ResponseWrapData wraps the given data in a cubbyhole and returns the

View File

@@ -1251,12 +1251,10 @@ func (b *SystemBackend) handleMountTable(
// handleMount is used to mount a new path // handleMount is used to mount a new path
func (b *SystemBackend) handleMount( func (b *SystemBackend) handleMount(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) { req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
b.Core.clusterParamsLock.RLock()
repState := b.Core.replicationState repState := b.Core.replicationState
b.Core.clusterParamsLock.RUnlock()
local := data.Get("local").(bool) local := data.Get("local").(bool)
if !local && repState == consts.ReplicationSecondary { if !local && repState.HasState(consts.ReplicationPerformanceSecondary) {
return logical.ErrorResponse("cannot add a non-local mount to a replication secondary"), nil return logical.ErrorResponse("cannot add a non-local mount to a replication secondary"), nil
} }
@@ -1381,9 +1379,9 @@ func (b *SystemBackend) handleUnmount(
path := data.Get("path").(string) path := data.Get("path").(string)
path = sanitizeMountPath(path) path = sanitizeMountPath(path)
repState := b.Core.ReplicationState() repState := b.Core.replicationState
entry := b.Core.router.MatchingMountEntry(path) entry := b.Core.router.MatchingMountEntry(path)
if entry != nil && !entry.Local && repState == consts.ReplicationSecondary { if entry != nil && !entry.Local && repState.HasState(consts.ReplicationPerformanceSecondary) {
return logical.ErrorResponse("cannot unmount a non-local mount on a replication secondary"), nil return logical.ErrorResponse("cannot unmount a non-local mount on a replication secondary"), nil
} }
@@ -1406,9 +1404,7 @@ func (b *SystemBackend) handleUnmount(
// handleRemount is used to remount a path // handleRemount is used to remount a path
func (b *SystemBackend) handleRemount( func (b *SystemBackend) handleRemount(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) { req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
b.Core.clusterParamsLock.RLock()
repState := b.Core.replicationState repState := b.Core.replicationState
b.Core.clusterParamsLock.RUnlock()
// Get the paths // Get the paths
fromPath := data.Get("from").(string) fromPath := data.Get("from").(string)
@@ -1423,7 +1419,7 @@ func (b *SystemBackend) handleRemount(
toPath = sanitizeMountPath(toPath) toPath = sanitizeMountPath(toPath)
entry := b.Core.router.MatchingMountEntry(fromPath) entry := b.Core.router.MatchingMountEntry(fromPath)
if entry != nil && !entry.Local && repState == consts.ReplicationSecondary { if entry != nil && !entry.Local && repState.HasState(consts.ReplicationPerformanceSecondary) {
return logical.ErrorResponse("cannot remount a non-local mount on a replication secondary"), nil return logical.ErrorResponse("cannot remount a non-local mount on a replication secondary"), nil
} }
@@ -1519,9 +1515,7 @@ func (b *SystemBackend) handleMountTuneWrite(
// handleTuneWriteCommon is used to set config settings on a path // handleTuneWriteCommon is used to set config settings on a path
func (b *SystemBackend) handleTuneWriteCommon( func (b *SystemBackend) handleTuneWriteCommon(
path string, data *framework.FieldData) (*logical.Response, error) { path string, data *framework.FieldData) (*logical.Response, error) {
b.Core.clusterParamsLock.RLock()
repState := b.Core.replicationState repState := b.Core.replicationState
b.Core.clusterParamsLock.RUnlock()
path = sanitizeMountPath(path) path = sanitizeMountPath(path)
@@ -1538,7 +1532,7 @@ func (b *SystemBackend) handleTuneWriteCommon(
b.Backend.Logger().Error("sys: tune failed: no mount entry found", "path", path) b.Backend.Logger().Error("sys: tune failed: no mount entry found", "path", path)
return handleError(fmt.Errorf("sys: tune of path '%s' failed: no mount entry found", path)) return handleError(fmt.Errorf("sys: tune of path '%s' failed: no mount entry found", path))
} }
if mountEntry != nil && !mountEntry.Local && repState == consts.ReplicationSecondary { if mountEntry != nil && !mountEntry.Local && repState.HasState(consts.ReplicationPerformanceSecondary) {
return logical.ErrorResponse("cannot tune a non-local mount on a replication secondary"), nil return logical.ErrorResponse("cannot tune a non-local mount on a replication secondary"), nil
} }
@@ -1757,12 +1751,10 @@ func (b *SystemBackend) handleAuthTable(
// handleEnableAuth is used to enable a new credential backend // handleEnableAuth is used to enable a new credential backend
func (b *SystemBackend) handleEnableAuth( func (b *SystemBackend) handleEnableAuth(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) { req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
b.Core.clusterParamsLock.RLock()
repState := b.Core.replicationState repState := b.Core.replicationState
b.Core.clusterParamsLock.RUnlock()
local := data.Get("local").(bool) local := data.Get("local").(bool)
if !local && repState == consts.ReplicationSecondary { if !local && repState.HasState(consts.ReplicationPerformanceSecondary) {
return logical.ErrorResponse("cannot add a non-local mount to a replication secondary"), nil return logical.ErrorResponse("cannot add a non-local mount to a replication secondary"), nil
} }
@@ -1833,9 +1825,9 @@ func (b *SystemBackend) handleDisableAuth(
path = sanitizeMountPath(path) path = sanitizeMountPath(path)
fullPath := credentialRoutePrefix + path fullPath := credentialRoutePrefix + path
repState := b.Core.ReplicationState() repState := b.Core.replicationState
entry := b.Core.router.MatchingMountEntry(fullPath) entry := b.Core.router.MatchingMountEntry(fullPath)
if entry != nil && !entry.Local && repState == consts.ReplicationSecondary { if entry != nil && !entry.Local && repState.HasState(consts.ReplicationPerformanceSecondary) {
return logical.ErrorResponse("cannot unmount a non-local mount on a replication secondary"), nil return logical.ErrorResponse("cannot unmount a non-local mount on a replication secondary"), nil
} }
@@ -1984,12 +1976,10 @@ func (b *SystemBackend) handleAuditHash(
// handleEnableAudit is used to enable a new audit backend // handleEnableAudit is used to enable a new audit backend
func (b *SystemBackend) handleEnableAudit( func (b *SystemBackend) handleEnableAudit(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) { req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
b.Core.clusterParamsLock.RLock()
repState := b.Core.replicationState repState := b.Core.replicationState
b.Core.clusterParamsLock.RUnlock()
local := data.Get("local").(bool) local := data.Get("local").(bool)
if !local && repState == consts.ReplicationSecondary { if !local && repState.HasState(consts.ReplicationPerformanceSecondary) {
return logical.ErrorResponse("cannot add a non-local mount to a replication secondary"), nil return logical.ErrorResponse("cannot add a non-local mount to a replication secondary"), nil
} }
@@ -2132,10 +2122,8 @@ func (b *SystemBackend) handleKeyStatus(
// handleRotate is used to trigger a key rotation // handleRotate is used to trigger a key rotation
func (b *SystemBackend) handleRotate( func (b *SystemBackend) handleRotate(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) { req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
b.Core.clusterParamsLock.RLock()
repState := b.Core.replicationState repState := b.Core.replicationState
b.Core.clusterParamsLock.RUnlock() if repState.HasState(consts.ReplicationPerformanceSecondary) {
if repState == consts.ReplicationSecondary {
return logical.ErrorResponse("cannot rotate on a replication secondary"), nil return logical.ErrorResponse("cannot rotate on a replication secondary"), nil
} }

View File

@@ -544,7 +544,7 @@ func (c *Core) loadMounts() error {
// ensure this comes over. If we upgrade first, we simply don't // ensure this comes over. If we upgrade first, we simply don't
// create the mount, so we won't conflict when we sync. If this is // create the mount, so we won't conflict when we sync. If this is
// local (e.g. cubbyhole) we do still add it. // local (e.g. cubbyhole) we do still add it.
if !foundRequired && (c.replicationState != consts.ReplicationSecondary || requiredMount.Local) { if !foundRequired && (c.replicationState.HasState(consts.ReplicationPerformanceSecondary) || requiredMount.Local) {
c.mounts.Entries = append(c.mounts.Entries, requiredMount) c.mounts.Entries = append(c.mounts.Entries, requiredMount)
needPersist = true needPersist = true
} }

View File

@@ -145,7 +145,7 @@ func (c *Core) setupPolicyStore() error {
sysView := &dynamicSystemView{core: c} sysView := &dynamicSystemView{core: c}
c.policyStore = NewPolicyStore(view, sysView) c.policyStore = NewPolicyStore(view, sysView)
if sysView.ReplicationState() == consts.ReplicationSecondary { if c.replicationState.HasState(consts.ReplicationPerformanceSecondary) {
// Policies will sync from the primary // Policies will sync from the primary
return nil return nil
} }