Make identity store loading and alias merging deterministic (#28867)

* Make identity store loading and alias merging deterministic

* Add CHANGELOG

* Refactor our Ent-only logic from determinism test

* Use stub-maker

* Add test godoc
This commit is contained in:
Paul Banks
2024-11-11 15:53:16 +00:00
committed by GitHub
parent c09ca8c124
commit 1aa9a7a138
5 changed files with 363 additions and 49 deletions

4
changelog/28867.txt Normal file
View File

@@ -0,0 +1,4 @@
```release-note:bug
core: Fix an issue where duplicate identity aliases in storage could be merged
inconsistently during different unseal events or on different servers.
```

View File

@@ -5,6 +5,8 @@ package vault
import (
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"
@@ -13,11 +15,15 @@ import (
"github.com/go-test/deep"
uuid "github.com/hashicorp/go-uuid"
credGithub "github.com/hashicorp/vault/builtin/credential/github"
"github.com/hashicorp/vault/builtin/credential/userpass"
credUserpass "github.com/hashicorp/vault/builtin/credential/userpass"
"github.com/hashicorp/vault/helper/identity"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/storagepacker"
"github.com/hashicorp/vault/helper/testhelpers/corehelpers"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/sdk/physical/inmem"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
@@ -1400,3 +1406,250 @@ func TestIdentityStoreInvalidate_TemporaryEntity(t *testing.T) {
assert.NoError(t, err)
assert.Nil(t, item)
}
// TestEntityStoreLoadingIsDeterministic is a property-based test that ensures
// the loading logic of the entity store is deterministic. This is important
// because we perform certain merges and corrections of duplicates on load and
// non-deterministic order can cause divergence between different nodes or even
// after seal/unseal cycles on one node. Loading _should_ be deterministic
// anyway if all data in storage was correct see comments inline for examples of
// ways storage can be corrupt with respect to the expected schema invariants.
func TestEntityStoreLoadingIsDeterministic(t *testing.T) {
// Create some state in store that could trigger non-deterministic behavior.
// The nature of the identity store schema is such that the order of loading
// entities etc shouldn't matter even if it was non-deterministic, however due
// to many and varied historical (and possibly current/future) bugs, we have
// seen many cases where storage ends up with duplicates persisted. This is
// not ideal of course and our code attempts to "fix" on the fly with merges
// on load. But it's hampered by the fact that the current implementation does
// not load entities in a deterministic order. which means that different
// nodes potentially resolve merges differently. This test proves that that
// happens and should hopefully provide some confidence that we don't
// introduce non-determinism in the future somehow. It's a bit odd we have to
// inject essentially invalid data into storage to trigger the issue but
// that's what we get in real life sometimes!
logger := corehelpers.NewTestLogger(t)
ims, err := inmem.NewTransactionalInmemHA(nil, logger)
require.NoError(t, err)
cfg := &CoreConfig{
Physical: ims,
HAPhysical: ims.(physical.HABackend),
Logger: logger,
BuiltinRegistry: corehelpers.NewMockBuiltinRegistry(),
CredentialBackends: map[string]logical.Factory{
"userpass": userpass.Factory,
},
}
c, sealKeys, rootToken := TestCoreUnsealedWithConfig(t, cfg)
// Inject values into storage
upme, err := TestUserpassMount(c, false)
require.NoError(t, err)
localMe, err := TestUserpassMount(c, true)
require.NoError(t, err)
ctx := context.Background()
// We create 100 entities each with 1 non-local alias and 1 local alias. We
// then randomly create duplicate alias or local alias entries with a
// probability that is unrealistic but ensures we have duplicates on every
// test run with high probability and more than 1 duplicate often.
for i := 0; i <= 100; i++ {
id := fmt.Sprintf("entity-%d", i)
alias := fmt.Sprintf("alias-%d", i)
localAlias := fmt.Sprintf("localalias-%d", i)
e := makeEntityForPacker(t, id, c.identityStore.entityPacker)
attachAlias(t, e, alias, upme)
attachAlias(t, e, localAlias, localMe)
err = TestHelperWriteToStoragePacker(ctx, c.identityStore.entityPacker, e.ID, e)
require.NoError(t, err)
// Subset of entities get a duplicate alias and/or duplicate local alias.
// We'll use a probability of 0.3 for each dup so that we expect at least a
// few double and maybe triple duplicates of each type every few test runs
// and may have duplicates of both types or neither etc.
pDup := 0.3
rnd := rand.Float64()
dupeNum := 1
for rnd < pDup && dupeNum < 10 {
e := makeEntityForPacker(t, fmt.Sprintf("entity-%d-dup-%d", i, dupeNum), c.identityStore.entityPacker)
attachAlias(t, e, alias, upme)
err = TestHelperWriteToStoragePacker(ctx, c.identityStore.entityPacker, e.ID, e)
require.NoError(t, err)
// Toss again to see if we continue
rnd = rand.Float64()
dupeNum++
}
// Toss the coin again to see if there are any local dupes
dupeNum = 1
rnd = rand.Float64()
for rnd < pDup && dupeNum < 10 {
e := makeEntityForPacker(t, fmt.Sprintf("entity-%d-localdup-%d", i, dupeNum), c.identityStore.entityPacker)
attachAlias(t, e, localAlias, localMe)
err = TestHelperWriteToStoragePacker(ctx, c.identityStore.entityPacker, e.ID, e)
require.NoError(t, err)
rnd = rand.Float64()
dupeNum++
}
// One more edge case is that it's currently possible as of the time of
// writing for a failure during entity invalidation to result in a permanent
// "cached" entity in the local alias packer even though we do have the
// replicated entity in the entity packer too. This is a bug and will
// hopefully be fixed at some point soon, but even after it is it's
// important that we still test for it since existing clusters may still
// have this persistent state. Pick a low probability but one we're very
// likely to hit in 100 iterations and write the entity to the local alias
// table too (this mimics the behavior of cacheTemporaryEntity).
pFailedLocalAliasInvalidation := 0.02
if rand.Float64() < pFailedLocalAliasInvalidation {
err = TestHelperWriteToStoragePacker(ctx, c.identityStore.localAliasPacker, e.ID+tmpSuffix, e)
require.NoError(t, err)
}
}
// Create some groups
for i := 0; i <= 100; i++ {
id := fmt.Sprintf("group-%d", i)
bucketKey := c.identityStore.groupPacker.BucketKey(id)
// Add an alias to every other group
alias := ""
if i%2 == 0 {
alias = fmt.Sprintf("groupalias-%d", i)
}
e := makeGroupWithIDAndAlias(t, id, alias, bucketKey, upme)
err = TestHelperWriteToStoragePacker(ctx, c.identityStore.groupPacker, e.ID, e)
require.NoError(t, err)
}
// Now add 10 groups with the same alias to ensure duplicates don't cause
// non-deterministic behavior.
for i := 0; i <= 10; i++ {
id := fmt.Sprintf("group-dup-%d", i)
bucketKey := c.identityStore.groupPacker.BucketKey(id)
e := makeGroupWithIDAndAlias(t, id, "groupalias-dup", bucketKey, upme)
err = TestHelperWriteToStoragePacker(ctx, c.identityStore.groupPacker, e.ID, e)
require.NoError(t, err)
}
entIdentityStoreDeterminismTestSetup(t, ctx, c, upme, localMe)
// Storage is now primed for the test.
// To test that this is deterministic we need to load from storage a bunch of
// times and make sure we get the same result. For easier debugging we'll
// build a list of human readable ids that we can compare.
lastIDs := []string{}
for i := 0; i < 10; i++ {
// Seal and unseal to reload the identity store
require.NoError(t, c.Seal(rootToken))
require.True(t, c.Sealed())
for _, key := range sealKeys {
unsealed, err := c.Unseal(key)
require.NoError(t, err)
if unsealed {
break
}
}
require.False(t, c.Sealed())
// Identity store should be loaded now. Check it's contents.
loadedIDs := []string{}
tx := c.identityStore.db.Txn(false)
// Entities + their aliases
iter, err := tx.LowerBound(entitiesTable, "id", "")
require.NoError(t, err)
for item := iter.Next(); item != nil; item = iter.Next() {
// We already added "type" prefixes to the IDs when creating them so just
// append here.
e := item.(*identity.Entity)
loadedIDs = append(loadedIDs, e.ID)
for _, a := range e.Aliases {
loadedIDs = append(loadedIDs, a.ID)
}
}
// This is a non-triviality check to make sure we actually loaded stuff and
// are not just passing because of a bug in the test.
numLoaded := len(loadedIDs)
require.Greater(t, numLoaded, 300, "not enough entities and aliases loaded on attempt %d", i)
// Groups
iter, err = tx.LowerBound(groupsTable, "id", "")
require.NoError(t, err)
for item := iter.Next(); item != nil; item = iter.Next() {
g := item.(*identity.Group)
loadedIDs = append(loadedIDs, g.ID)
if g.Alias != nil {
loadedIDs = append(loadedIDs, g.Alias.ID)
}
}
// This is a non-triviality check to make sure we actually loaded stuff and
// are not just passing because of a bug in the test.
groupsLoaded := len(loadedIDs) - numLoaded
require.Greater(t, groupsLoaded, 140, "not enough groups and aliases loaded on attempt %d", i)
entIdentityStoreDeterminismAssert(t, i, loadedIDs, lastIDs)
if i > 0 {
// Should be in the same order if we are deterministic since MemDB has strong ordering.
require.Equal(t, lastIDs, loadedIDs, "different result on attempt %d", i)
}
lastIDs = loadedIDs
}
}
func makeEntityForPacker(t *testing.T, id string, p *storagepacker.StoragePacker) *identity.Entity {
return &identity.Entity{
ID: id,
Name: id,
NamespaceID: namespace.RootNamespaceID,
BucketKey: p.BucketKey(id),
}
}
func attachAlias(t *testing.T, e *identity.Entity, name string, me *MountEntry) *identity.Alias {
a := &identity.Alias{
ID: name,
Name: name,
CanonicalID: e.ID,
MountType: me.Type,
MountAccessor: me.Accessor,
}
e.UpsertAlias(a)
return a
}
func makeGroupWithIDAndAlias(t *testing.T, id, alias, bucketKey string, me *MountEntry) *identity.Group {
g := &identity.Group{
ID: id,
Name: id,
NamespaceID: namespace.RootNamespaceID,
BucketKey: bucketKey,
}
if alias != "" {
g.Alias = &identity.Alias{
ID: id,
Name: alias,
CanonicalID: id,
MountType: me.Type,
MountAccessor: me.Accessor,
}
}
return g
}
func makeLocalAliasWithID(t *testing.T, id, entityID string, bucketKey string, me *MountEntry) *identity.LocalAliases {
return &identity.LocalAliases{
Aliases: []*identity.Alias{
{
ID: id,
Name: id,
CanonicalID: entityID,
MountType: me.Type,
MountAccessor: me.Accessor,
},
},
}
}

View File

@@ -218,12 +218,20 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er
i.logger.Debug("cached entities of local alias entries", "num_buckets", len(existing))
// Make the channels used for the worker pool
broker := make(chan string)
broker := make(chan int)
quit := make(chan bool)
// Buffer these channels to prevent deadlocks
errs := make(chan error, len(existing))
result := make(chan *storagepacker.Bucket, len(existing))
// We want to process the buckets in deterministic order so that duplicate
// merging is deterministic. We still want to load in parallel though so
// create a slice of result channels, one for each bucket. We need each result
// and err chan to be 1 buffered so we can leave a result there even if the
// processing loop is blocking on an earlier bucket still.
results := make([]chan *storagepacker.Bucket, len(existing))
errs := make([]chan error, len(existing))
for j := range existing {
results[j] = make(chan *storagepacker.Bucket, 1)
errs[j] = make(chan error, 1)
}
// Use a wait group
wg := &sync.WaitGroup{}
@@ -236,20 +244,21 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er
for {
select {
case key, ok := <-broker:
case idx, ok := <-broker:
// broker has been closed, we are done
if !ok {
return
}
key := existing[idx]
bucket, err := i.localAliasPacker.GetBucket(ctx, localAliasesBucketsPrefix+key)
if err != nil {
errs <- err
errs[idx] <- err
continue
}
// Write results out to the result channel
result <- bucket
results[idx] <- bucket
// quit early
case <-quit:
@@ -263,7 +272,7 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er
wg.Add(1)
go func() {
defer wg.Done()
for j, key := range existing {
for j := range existing {
if j%500 == 0 {
i.logger.Debug("cached entities of local aliases loading", "progress", j)
}
@@ -273,7 +282,7 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er
return
default:
broker <- key
broker <- j
}
}
@@ -288,16 +297,16 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er
i.logger.Info("cached entities of local aliases restored")
}()
// Restore each key by pulling from the result chan
for j := 0; j < len(existing); j++ {
// Restore each key by pulling from the slice of result chans
for j := range existing {
select {
case err := <-errs:
case err := <-errs[j]:
// Close all go routines
close(quit)
return err
case bucket := <-result:
case bucket := <-results[j]:
// If there is no entry, nothing to restore
if bucket == nil {
continue
@@ -338,13 +347,24 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
i.logger.Debug("entities collected", "num_existing", len(existing))
duplicatedAccessors := make(map[string]struct{})
// Make the channels used for the worker pool
broker := make(chan string)
// Make the channels used for the worker pool. We send the index into existing
// so that we can keep results in the same order as inputs. Note that this is
// goroutine safe as long as we never mutate existing again in this method
// which we don't.
broker := make(chan int)
quit := make(chan bool)
// Buffer these channels to prevent deadlocks
errs := make(chan error, len(existing))
result := make(chan *storagepacker.Bucket, len(existing))
// We want to process the buckets in deterministic order so that duplicate
// merging is deterministic. We still want to load in parallel though so
// create a slice of result channels, one for each bucket. We need each result
// and err chan to be 1 buffered so we can leave a result there even if the
// processing loop is blocking on an earlier bucket still.
results := make([]chan *storagepacker.Bucket, len(existing))
errs := make([]chan error, len(existing))
for j := range existing {
results[j] = make(chan *storagepacker.Bucket, 1)
errs[j] = make(chan error, 1)
}
// Use a wait group
wg := &sync.WaitGroup{}
@@ -357,20 +377,21 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
for {
select {
case key, ok := <-broker:
case idx, ok := <-broker:
// broker has been closed, we are done
if !ok {
return
}
key := existing[idx]
bucket, err := i.entityPacker.GetBucket(ctx, storagepacker.StoragePackerBucketsPrefix+key)
if err != nil {
errs <- err
errs[idx] <- err
continue
}
// Write results out to the result channel
result <- bucket
results[idx] <- bucket
// quit early
case <-quit:
@@ -384,17 +405,13 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
for j, key := range existing {
if j%500 == 0 {
i.logger.Debug("entities loading", "progress", j)
}
for j := range existing {
select {
case <-quit:
return
default:
broker <- key
broker <- j
}
}
@@ -404,14 +421,14 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
// Restore each key by pulling from the result chan
LOOP:
for j := 0; j < len(existing); j++ {
for j := range existing {
select {
case err = <-errs:
case err = <-errs[j]:
// Close all go routines
close(quit)
break LOOP
case bucket := <-result:
case bucket := <-results[j]:
// If there is no entry, nothing to restore
if bucket == nil {
continue

View File

@@ -0,0 +1,21 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
//go:build !enterprise
package vault
import (
"context"
"testing"
)
//go:generate go run github.com/hashicorp/vault/tools/stubmaker
func entIdentityStoreDeterminismTestSetup(t *testing.T, ctx context.Context, c *Core, upme, localme *MountEntry) {
// no op
}
func entIdentityStoreDeterminismAssert(t *testing.T, i int, loadedIDs, lastIDs []string) {
// no op
}

View File

@@ -31,7 +31,6 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/go-cleanhttp"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/reloadutil"
@@ -60,6 +59,7 @@ import (
"github.com/mitchellh/copystructure"
"golang.org/x/crypto/ed25519"
"golang.org/x/net/http2"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/anypb"
)
@@ -2215,17 +2215,29 @@ var (
_ testcluster.VaultClusterNode = &TestClusterCore{}
)
func TestUserpassMount(c *Core, local bool) (*MountEntry, error) {
name := "userpass"
if local {
name += "-local"
}
userpassMe := &MountEntry{
Table: credentialTableType,
Path: name + "/",
Type: "userpass",
Description: name,
Accessor: name,
Local: local,
}
if err := c.enableCredential(namespace.RootContext(nil), userpassMe); err != nil {
return nil, err
}
return userpassMe, nil
}
// TestCreateDuplicateEntityAliasesInStorage creates n entities with a duplicate alias in storage
// This should only be used in testing
func TestCreateDuplicateEntityAliasesInStorage(ctx context.Context, c *Core, n int) ([]string, error) {
userpassMe := &MountEntry{
Table: credentialTableType,
Path: "userpass/",
Type: "userpass",
Description: "userpass",
Accessor: "userpass1",
}
err := c.enableCredential(namespace.RootContext(nil), userpassMe)
userpassMe, err := TestUserpassMount(c, false)
if err != nil {
return nil, err
}
@@ -2250,16 +2262,7 @@ func TestCreateDuplicateEntityAliasesInStorage(ctx context.Context, c *Core, n i
NamespaceID: namespace.RootNamespaceID,
BucketKey: c.identityStore.entityPacker.BucketKey(entityID),
}
entity, err := ptypes.MarshalAny(e)
if err != nil {
return nil, err
}
item := &storagepacker.Item{
ID: e.ID,
Message: entity,
}
if err = c.identityStore.entityPacker.PutItem(ctx, item); err != nil {
if err := TestHelperWriteToStoragePacker(ctx, c.identityStore.entityPacker, e.ID, e); err != nil {
return nil, err
}
}
@@ -2267,6 +2270,22 @@ func TestCreateDuplicateEntityAliasesInStorage(ctx context.Context, c *Core, n i
return entityIDs, nil
}
// TestHelperWriteToStoragePacker takes care of boiler place to insert into a
// storage packer. Just provide the raw protobuf object e.g. &identity.Entity{}
// and it is wrapped and inserted for you. You still need to populate BucketKey
// in the object if applicable before passing it.
func TestHelperWriteToStoragePacker(ctx context.Context, p *storagepacker.StoragePacker, id string, m protoreflect.ProtoMessage) error {
a, err := anypb.New(m)
if err != nil {
return err
}
i := &storagepacker.Item{
ID: id,
Message: a,
}
return p.PutItem(context.Background(), i)
}
// TestCreateStorageGroup creates a group in storage only to bypass checks that the entities exist in memdb
// Should only be used in testing
func TestCreateStorageGroup(ctx context.Context, c *Core, entityIDs []string) error {