mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 17:52:32 +00:00
Fix OSS sealunwrapper adding extra get + put request to all storage get requests (#29050)
* fix OSS sealunwrapper adding extra get + put request to all storage requests * Add changelog entry
This commit is contained in:
4
changelog/29050.txt
Normal file
4
changelog/29050.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
```release-note:bug
|
||||
core: fix bug in seal unwrapper that caused high storage latency in Vault CE. For every storage read request, the
|
||||
seal unwrapper was performing the read twice, and would also issue an unnecessary storage write.
|
||||
```
|
||||
@@ -83,6 +83,13 @@ func (i *InmemHABackend) HAEnabled() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (i *InmemHABackend) Underlying() *InmemBackend {
|
||||
if txBackend, ok := i.Backend.(*TransactionalInmemBackend); ok {
|
||||
return &txBackend.InmemBackend
|
||||
}
|
||||
return i.Backend.(*InmemBackend)
|
||||
}
|
||||
|
||||
// InmemLock is an in-memory Lock implementation for the HABackend
|
||||
type InmemLock struct {
|
||||
in *InmemHABackend
|
||||
|
||||
@@ -18,10 +18,9 @@ import (
|
||||
// NewSealUnwrapper creates a new seal unwrapper
|
||||
func NewSealUnwrapper(underlying physical.Backend, logger log.Logger) physical.Backend {
|
||||
ret := &sealUnwrapper{
|
||||
underlying: underlying,
|
||||
logger: logger,
|
||||
locks: locksutil.CreateLocks(),
|
||||
allowUnwraps: new(uint32),
|
||||
underlying: underlying,
|
||||
logger: logger,
|
||||
locks: locksutil.CreateLocks(),
|
||||
}
|
||||
|
||||
if underTxn, ok := underlying.(physical.Transactional); ok {
|
||||
@@ -43,7 +42,7 @@ type sealUnwrapper struct {
|
||||
underlying physical.Backend
|
||||
logger log.Logger
|
||||
locks []*locksutil.LockEntry
|
||||
allowUnwraps *uint32
|
||||
allowUnwraps atomic.Bool
|
||||
}
|
||||
|
||||
// transactionalSealUnwrapper is a seal unwrapper that wraps a physical that is transactional
|
||||
@@ -63,63 +62,70 @@ func (d *sealUnwrapper) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
return d.underlying.Put(ctx, entry)
|
||||
}
|
||||
|
||||
// unwrap gets an entry from underlying storage and tries to unwrap it. If the entry was not wrapped, return
|
||||
// value unwrappedEntry will be nil. If the entry is wrapped and encrypted, an error is returned.
|
||||
func (d *sealUnwrapper) unwrap(ctx context.Context, key string) (entry, unwrappedEntry *physical.Entry, err error) {
|
||||
entry, err = d.underlying.Get(ctx, key)
|
||||
// unwrap gets an entry from underlying storage and tries to unwrap it.
|
||||
// - If the entry is not wrapped: the entry will be returned unchanged and wasWrapped will be false
|
||||
// - If the entry is wrapped and encrypted: an error is returned.
|
||||
// - If the entry is wrapped but not encrypted: the entry will be unwrapped and returned. wasWrapped will be true.
|
||||
func (d *sealUnwrapper) unwrap(ctx context.Context, key string) (unwrappedEntry *physical.Entry, wasWrapped bool, err error) {
|
||||
entry, err := d.underlying.Get(ctx, key)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
if entry == nil {
|
||||
return nil, nil, err
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
wrappedEntryValue, unmarshaled := UnmarshalSealWrappedValueWithCanary(entry.Value)
|
||||
switch {
|
||||
case !unmarshaled:
|
||||
unwrappedEntry = entry
|
||||
// Entry is not wrapped
|
||||
return entry, false, nil
|
||||
case wrappedEntryValue.isEncrypted():
|
||||
return nil, nil, fmt.Errorf("cannot decode sealwrapped storage entry %q", entry.Key)
|
||||
// Entry is wrapped and encrypted
|
||||
return nil, true, fmt.Errorf("cannot decode sealwrapped storage entry %q", entry.Key)
|
||||
default:
|
||||
// Entry is wrapped and not encrypted
|
||||
pt, err := wrappedEntryValue.getPlaintextValue()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, true, err
|
||||
}
|
||||
unwrappedEntry = &physical.Entry{
|
||||
return &physical.Entry{
|
||||
Key: entry.Key,
|
||||
Value: pt,
|
||||
}
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
return entry, unwrappedEntry, nil
|
||||
}
|
||||
|
||||
func (d *sealUnwrapper) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
entry, unwrappedEntry, err := d.unwrap(ctx, key)
|
||||
entry, wasWrapped, err := d.unwrap(ctx, key)
|
||||
switch {
|
||||
case err != nil:
|
||||
case err != nil: // Failed to get entry
|
||||
return nil, err
|
||||
case entry == nil:
|
||||
case entry == nil: // Entry doesn't exist
|
||||
return nil, nil
|
||||
case atomic.LoadUint32(d.allowUnwraps) != 1:
|
||||
return unwrappedEntry, nil
|
||||
case !wasWrapped || !d.allowUnwraps.Load(): // Entry was not wrapped or unwrapping not allowed
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
// Entry was wrapped, we need to replace it with the unwrapped value
|
||||
|
||||
// Grab locks because we are performing a write
|
||||
locksutil.LockForKey(d.locks, key).Lock()
|
||||
defer locksutil.LockForKey(d.locks, key).Unlock()
|
||||
|
||||
// At this point we need to re-read and re-check
|
||||
entry, unwrappedEntry, err = d.unwrap(ctx, key)
|
||||
// Read entry again in case it was changed while we were waiting for the lock
|
||||
entry, wasWrapped, err = d.unwrap(ctx, key)
|
||||
switch {
|
||||
case err != nil:
|
||||
case err != nil: // Failed to get entry
|
||||
return nil, err
|
||||
case entry == nil:
|
||||
case entry == nil: // Entry doesn't exist
|
||||
return nil, nil
|
||||
case atomic.LoadUint32(d.allowUnwraps) != 1:
|
||||
return unwrappedEntry, nil
|
||||
case !wasWrapped || !d.allowUnwraps.Load(): // Entry was not wrapped or unwrapping not allowed
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
return unwrappedEntry, d.underlying.Put(ctx, unwrappedEntry)
|
||||
// Write out the unwrapped value
|
||||
return entry, d.underlying.Put(ctx, entry)
|
||||
}
|
||||
|
||||
func (d *sealUnwrapper) Delete(ctx context.Context, key string) error {
|
||||
@@ -155,12 +161,12 @@ func (d *transactionalSealUnwrapper) Transaction(ctx context.Context, txns []*ph
|
||||
// This should only run during preSeal which ensures that it can't be run
|
||||
// concurrently and that it will be run only by the active node
|
||||
func (d *sealUnwrapper) stopUnwraps() {
|
||||
atomic.StoreUint32(d.allowUnwraps, 0)
|
||||
d.allowUnwraps.Store(false)
|
||||
}
|
||||
|
||||
func (d *sealUnwrapper) runUnwraps() {
|
||||
// Allow key unwraps on key gets. This gets set only when running on the
|
||||
// active node to prevent standbys from changing data underneath the
|
||||
// primary
|
||||
atomic.StoreUint32(d.allowUnwraps, 1)
|
||||
d.allowUnwraps.Store(true)
|
||||
}
|
||||
|
||||
@@ -21,25 +21,29 @@ import (
|
||||
func TestSealUnwrapper(t *testing.T) {
|
||||
logger := corehelpers.NewTestLogger(t)
|
||||
|
||||
// Test without transactions
|
||||
phys, err := inmem.NewInmemHA(nil, logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
performTestSealUnwrapper(t, phys, logger)
|
||||
// Test with both cache enabled and disabled
|
||||
for _, disableCache := range []bool{true, false} {
|
||||
// Test without transactions
|
||||
phys, err := inmem.NewInmemHA(nil, logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
performTestSealUnwrapper(t, phys, logger, disableCache)
|
||||
|
||||
// Test with transactions
|
||||
tPhys, err := inmem.NewTransactionalInmemHA(nil, logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
// Test with transactions
|
||||
tPhys, err := inmem.NewTransactionalInmemHA(nil, logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
performTestSealUnwrapper(t, tPhys, logger, disableCache)
|
||||
}
|
||||
performTestSealUnwrapper(t, tPhys, logger)
|
||||
}
|
||||
|
||||
func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Logger) {
|
||||
func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Logger, disableCache bool) {
|
||||
ctx := context.Background()
|
||||
base := &CoreConfig{
|
||||
Physical: phys,
|
||||
Physical: phys,
|
||||
DisableCache: disableCache,
|
||||
}
|
||||
cluster := NewTestCluster(t, base, &TestClusterOptions{
|
||||
Logger: logger,
|
||||
@@ -47,6 +51,8 @@ func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Lo
|
||||
cluster.Start()
|
||||
defer cluster.Cleanup()
|
||||
|
||||
physImem := phys.(interface{ Underlying() *inmem.InmemBackend }).Underlying()
|
||||
|
||||
// Read a value and then save it back in a proto message
|
||||
entry, err := phys.Get(ctx, "core/master")
|
||||
if err != nil {
|
||||
@@ -78,7 +84,15 @@ func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Lo
|
||||
// successfully decode it, but be able to unmarshal it when read back from
|
||||
// the underlying physical store. When we read from active, it should both
|
||||
// successfully decode it and persist it back.
|
||||
checkValue := func(core *Core, wrapped bool) {
|
||||
checkValue := func(core *Core, wrapped bool, ro bool) {
|
||||
if ro {
|
||||
physImem.FailPut(true)
|
||||
physImem.FailDelete(true)
|
||||
defer func() {
|
||||
physImem.FailPut(false)
|
||||
physImem.FailDelete(false)
|
||||
}()
|
||||
}
|
||||
entry, err := core.physical.Get(ctx, "core/master")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -106,7 +120,12 @@ func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Lo
|
||||
}
|
||||
|
||||
TestWaitActive(t, cluster.Cores[0].Core)
|
||||
checkValue(cluster.Cores[2].Core, true)
|
||||
checkValue(cluster.Cores[1].Core, true)
|
||||
checkValue(cluster.Cores[0].Core, false)
|
||||
checkValue(cluster.Cores[2].Core, true, true)
|
||||
checkValue(cluster.Cores[1].Core, true, true)
|
||||
checkValue(cluster.Cores[0].Core, false, false)
|
||||
|
||||
// The storage entry should now be unwrapped, so there should be no more writes to storage when we read it
|
||||
checkValue(cluster.Cores[2].Core, false, true)
|
||||
checkValue(cluster.Cores[1].Core, false, true)
|
||||
checkValue(cluster.Cores[0].Core, false, true)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user