From 1aa9a7a138e870806598388bf445afe5c9979e82 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Mon, 11 Nov 2024 15:53:16 +0000 Subject: [PATCH] Make identity store loading and alias merging deterministic (#28867) * Make identity store loading and alias merging deterministic * Add CHANGELOG * Refactor our Ent-only logic from determinism test * Use stub-maker * Add test godoc --- changelog/28867.txt | 4 + vault/identity_store_test.go | 253 ++++++++++++++++++++++++++ vault/identity_store_util.go | 77 +++++--- vault/identiy_store_test_stubs_oss.go | 21 +++ vault/testing.go | 57 ++++-- 5 files changed, 363 insertions(+), 49 deletions(-) create mode 100644 changelog/28867.txt create mode 100644 vault/identiy_store_test_stubs_oss.go diff --git a/changelog/28867.txt b/changelog/28867.txt new file mode 100644 index 0000000000..a0e541bce6 --- /dev/null +++ b/changelog/28867.txt @@ -0,0 +1,4 @@ +```release-note:bug +core: Fix an issue where duplicate identity aliases in storage could be merged +inconsistently during different unseal events or on different servers. +``` diff --git a/vault/identity_store_test.go b/vault/identity_store_test.go index da16ae6fac..ed3b1208a6 100644 --- a/vault/identity_store_test.go +++ b/vault/identity_store_test.go @@ -5,6 +5,8 @@ package vault import ( "context" + "fmt" + "math/rand" "strings" "testing" "time" @@ -13,11 +15,15 @@ import ( "github.com/go-test/deep" uuid "github.com/hashicorp/go-uuid" credGithub "github.com/hashicorp/vault/builtin/credential/github" + "github.com/hashicorp/vault/builtin/credential/userpass" credUserpass "github.com/hashicorp/vault/builtin/credential/userpass" "github.com/hashicorp/vault/helper/identity" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/helper/storagepacker" + "github.com/hashicorp/vault/helper/testhelpers/corehelpers" "github.com/hashicorp/vault/sdk/logical" + "github.com/hashicorp/vault/sdk/physical" + "github.com/hashicorp/vault/sdk/physical/inmem" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -1400,3 +1406,250 @@ func TestIdentityStoreInvalidate_TemporaryEntity(t *testing.T) { assert.NoError(t, err) assert.Nil(t, item) } + +// TestEntityStoreLoadingIsDeterministic is a property-based test that ensures +// the loading logic of the entity store is deterministic. This is important +// because we perform certain merges and corrections of duplicates on load and +// non-deterministic order can cause divergence between different nodes or even +// after seal/unseal cycles on one node. Loading _should_ be deterministic +// anyway if all data in storage was correct see comments inline for examples of +// ways storage can be corrupt with respect to the expected schema invariants. +func TestEntityStoreLoadingIsDeterministic(t *testing.T) { + // Create some state in store that could trigger non-deterministic behavior. + // The nature of the identity store schema is such that the order of loading + // entities etc shouldn't matter even if it was non-deterministic, however due + // to many and varied historical (and possibly current/future) bugs, we have + // seen many cases where storage ends up with duplicates persisted. This is + // not ideal of course and our code attempts to "fix" on the fly with merges + // on load. But it's hampered by the fact that the current implementation does + // not load entities in a deterministic order. which means that different + // nodes potentially resolve merges differently. This test proves that that + // happens and should hopefully provide some confidence that we don't + // introduce non-determinism in the future somehow. It's a bit odd we have to + // inject essentially invalid data into storage to trigger the issue but + // that's what we get in real life sometimes! + logger := corehelpers.NewTestLogger(t) + ims, err := inmem.NewTransactionalInmemHA(nil, logger) + require.NoError(t, err) + + cfg := &CoreConfig{ + Physical: ims, + HAPhysical: ims.(physical.HABackend), + Logger: logger, + BuiltinRegistry: corehelpers.NewMockBuiltinRegistry(), + CredentialBackends: map[string]logical.Factory{ + "userpass": userpass.Factory, + }, + } + + c, sealKeys, rootToken := TestCoreUnsealedWithConfig(t, cfg) + + // Inject values into storage + upme, err := TestUserpassMount(c, false) + require.NoError(t, err) + localMe, err := TestUserpassMount(c, true) + require.NoError(t, err) + + ctx := context.Background() + + // We create 100 entities each with 1 non-local alias and 1 local alias. We + // then randomly create duplicate alias or local alias entries with a + // probability that is unrealistic but ensures we have duplicates on every + // test run with high probability and more than 1 duplicate often. + for i := 0; i <= 100; i++ { + id := fmt.Sprintf("entity-%d", i) + alias := fmt.Sprintf("alias-%d", i) + localAlias := fmt.Sprintf("localalias-%d", i) + e := makeEntityForPacker(t, id, c.identityStore.entityPacker) + attachAlias(t, e, alias, upme) + attachAlias(t, e, localAlias, localMe) + err = TestHelperWriteToStoragePacker(ctx, c.identityStore.entityPacker, e.ID, e) + require.NoError(t, err) + + // Subset of entities get a duplicate alias and/or duplicate local alias. + // We'll use a probability of 0.3 for each dup so that we expect at least a + // few double and maybe triple duplicates of each type every few test runs + // and may have duplicates of both types or neither etc. + pDup := 0.3 + rnd := rand.Float64() + dupeNum := 1 + for rnd < pDup && dupeNum < 10 { + e := makeEntityForPacker(t, fmt.Sprintf("entity-%d-dup-%d", i, dupeNum), c.identityStore.entityPacker) + attachAlias(t, e, alias, upme) + err = TestHelperWriteToStoragePacker(ctx, c.identityStore.entityPacker, e.ID, e) + require.NoError(t, err) + // Toss again to see if we continue + rnd = rand.Float64() + dupeNum++ + } + // Toss the coin again to see if there are any local dupes + dupeNum = 1 + rnd = rand.Float64() + for rnd < pDup && dupeNum < 10 { + e := makeEntityForPacker(t, fmt.Sprintf("entity-%d-localdup-%d", i, dupeNum), c.identityStore.entityPacker) + attachAlias(t, e, localAlias, localMe) + err = TestHelperWriteToStoragePacker(ctx, c.identityStore.entityPacker, e.ID, e) + require.NoError(t, err) + rnd = rand.Float64() + dupeNum++ + } + // One more edge case is that it's currently possible as of the time of + // writing for a failure during entity invalidation to result in a permanent + // "cached" entity in the local alias packer even though we do have the + // replicated entity in the entity packer too. This is a bug and will + // hopefully be fixed at some point soon, but even after it is it's + // important that we still test for it since existing clusters may still + // have this persistent state. Pick a low probability but one we're very + // likely to hit in 100 iterations and write the entity to the local alias + // table too (this mimics the behavior of cacheTemporaryEntity). + pFailedLocalAliasInvalidation := 0.02 + if rand.Float64() < pFailedLocalAliasInvalidation { + err = TestHelperWriteToStoragePacker(ctx, c.identityStore.localAliasPacker, e.ID+tmpSuffix, e) + require.NoError(t, err) + } + } + + // Create some groups + for i := 0; i <= 100; i++ { + id := fmt.Sprintf("group-%d", i) + bucketKey := c.identityStore.groupPacker.BucketKey(id) + // Add an alias to every other group + alias := "" + if i%2 == 0 { + alias = fmt.Sprintf("groupalias-%d", i) + } + e := makeGroupWithIDAndAlias(t, id, alias, bucketKey, upme) + err = TestHelperWriteToStoragePacker(ctx, c.identityStore.groupPacker, e.ID, e) + require.NoError(t, err) + } + // Now add 10 groups with the same alias to ensure duplicates don't cause + // non-deterministic behavior. + for i := 0; i <= 10; i++ { + id := fmt.Sprintf("group-dup-%d", i) + bucketKey := c.identityStore.groupPacker.BucketKey(id) + e := makeGroupWithIDAndAlias(t, id, "groupalias-dup", bucketKey, upme) + err = TestHelperWriteToStoragePacker(ctx, c.identityStore.groupPacker, e.ID, e) + require.NoError(t, err) + } + + entIdentityStoreDeterminismTestSetup(t, ctx, c, upme, localMe) + + // Storage is now primed for the test. + + // To test that this is deterministic we need to load from storage a bunch of + // times and make sure we get the same result. For easier debugging we'll + // build a list of human readable ids that we can compare. + lastIDs := []string{} + for i := 0; i < 10; i++ { + // Seal and unseal to reload the identity store + require.NoError(t, c.Seal(rootToken)) + require.True(t, c.Sealed()) + for _, key := range sealKeys { + unsealed, err := c.Unseal(key) + require.NoError(t, err) + if unsealed { + break + } + } + require.False(t, c.Sealed()) + + // Identity store should be loaded now. Check it's contents. + loadedIDs := []string{} + + tx := c.identityStore.db.Txn(false) + + // Entities + their aliases + iter, err := tx.LowerBound(entitiesTable, "id", "") + require.NoError(t, err) + for item := iter.Next(); item != nil; item = iter.Next() { + // We already added "type" prefixes to the IDs when creating them so just + // append here. + e := item.(*identity.Entity) + loadedIDs = append(loadedIDs, e.ID) + for _, a := range e.Aliases { + loadedIDs = append(loadedIDs, a.ID) + } + } + // This is a non-triviality check to make sure we actually loaded stuff and + // are not just passing because of a bug in the test. + numLoaded := len(loadedIDs) + require.Greater(t, numLoaded, 300, "not enough entities and aliases loaded on attempt %d", i) + + // Groups + iter, err = tx.LowerBound(groupsTable, "id", "") + require.NoError(t, err) + for item := iter.Next(); item != nil; item = iter.Next() { + g := item.(*identity.Group) + loadedIDs = append(loadedIDs, g.ID) + if g.Alias != nil { + loadedIDs = append(loadedIDs, g.Alias.ID) + } + } + // This is a non-triviality check to make sure we actually loaded stuff and + // are not just passing because of a bug in the test. + groupsLoaded := len(loadedIDs) - numLoaded + require.Greater(t, groupsLoaded, 140, "not enough groups and aliases loaded on attempt %d", i) + + entIdentityStoreDeterminismAssert(t, i, loadedIDs, lastIDs) + + if i > 0 { + // Should be in the same order if we are deterministic since MemDB has strong ordering. + require.Equal(t, lastIDs, loadedIDs, "different result on attempt %d", i) + } + lastIDs = loadedIDs + } +} + +func makeEntityForPacker(t *testing.T, id string, p *storagepacker.StoragePacker) *identity.Entity { + return &identity.Entity{ + ID: id, + Name: id, + NamespaceID: namespace.RootNamespaceID, + BucketKey: p.BucketKey(id), + } +} + +func attachAlias(t *testing.T, e *identity.Entity, name string, me *MountEntry) *identity.Alias { + a := &identity.Alias{ + ID: name, + Name: name, + CanonicalID: e.ID, + MountType: me.Type, + MountAccessor: me.Accessor, + } + e.UpsertAlias(a) + return a +} + +func makeGroupWithIDAndAlias(t *testing.T, id, alias, bucketKey string, me *MountEntry) *identity.Group { + g := &identity.Group{ + ID: id, + Name: id, + NamespaceID: namespace.RootNamespaceID, + BucketKey: bucketKey, + } + if alias != "" { + g.Alias = &identity.Alias{ + ID: id, + Name: alias, + CanonicalID: id, + MountType: me.Type, + MountAccessor: me.Accessor, + } + } + return g +} + +func makeLocalAliasWithID(t *testing.T, id, entityID string, bucketKey string, me *MountEntry) *identity.LocalAliases { + return &identity.LocalAliases{ + Aliases: []*identity.Alias{ + { + ID: id, + Name: id, + CanonicalID: entityID, + MountType: me.Type, + MountAccessor: me.Accessor, + }, + }, + } +} diff --git a/vault/identity_store_util.go b/vault/identity_store_util.go index 4e963e48e7..b3a6d47748 100644 --- a/vault/identity_store_util.go +++ b/vault/identity_store_util.go @@ -218,12 +218,20 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er i.logger.Debug("cached entities of local alias entries", "num_buckets", len(existing)) // Make the channels used for the worker pool - broker := make(chan string) + broker := make(chan int) quit := make(chan bool) - // Buffer these channels to prevent deadlocks - errs := make(chan error, len(existing)) - result := make(chan *storagepacker.Bucket, len(existing)) + // We want to process the buckets in deterministic order so that duplicate + // merging is deterministic. We still want to load in parallel though so + // create a slice of result channels, one for each bucket. We need each result + // and err chan to be 1 buffered so we can leave a result there even if the + // processing loop is blocking on an earlier bucket still. + results := make([]chan *storagepacker.Bucket, len(existing)) + errs := make([]chan error, len(existing)) + for j := range existing { + results[j] = make(chan *storagepacker.Bucket, 1) + errs[j] = make(chan error, 1) + } // Use a wait group wg := &sync.WaitGroup{} @@ -236,20 +244,21 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er for { select { - case key, ok := <-broker: + case idx, ok := <-broker: // broker has been closed, we are done if !ok { return } + key := existing[idx] bucket, err := i.localAliasPacker.GetBucket(ctx, localAliasesBucketsPrefix+key) if err != nil { - errs <- err + errs[idx] <- err continue } // Write results out to the result channel - result <- bucket + results[idx] <- bucket // quit early case <-quit: @@ -263,7 +272,7 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er wg.Add(1) go func() { defer wg.Done() - for j, key := range existing { + for j := range existing { if j%500 == 0 { i.logger.Debug("cached entities of local aliases loading", "progress", j) } @@ -273,7 +282,7 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er return default: - broker <- key + broker <- j } } @@ -288,16 +297,16 @@ func (i *IdentityStore) loadCachedEntitiesOfLocalAliases(ctx context.Context) er i.logger.Info("cached entities of local aliases restored") }() - // Restore each key by pulling from the result chan - for j := 0; j < len(existing); j++ { + // Restore each key by pulling from the slice of result chans + for j := range existing { select { - case err := <-errs: + case err := <-errs[j]: // Close all go routines close(quit) return err - case bucket := <-result: + case bucket := <-results[j]: // If there is no entry, nothing to restore if bucket == nil { continue @@ -338,13 +347,24 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error { i.logger.Debug("entities collected", "num_existing", len(existing)) duplicatedAccessors := make(map[string]struct{}) - // Make the channels used for the worker pool - broker := make(chan string) + // Make the channels used for the worker pool. We send the index into existing + // so that we can keep results in the same order as inputs. Note that this is + // goroutine safe as long as we never mutate existing again in this method + // which we don't. + broker := make(chan int) quit := make(chan bool) - // Buffer these channels to prevent deadlocks - errs := make(chan error, len(existing)) - result := make(chan *storagepacker.Bucket, len(existing)) + // We want to process the buckets in deterministic order so that duplicate + // merging is deterministic. We still want to load in parallel though so + // create a slice of result channels, one for each bucket. We need each result + // and err chan to be 1 buffered so we can leave a result there even if the + // processing loop is blocking on an earlier bucket still. + results := make([]chan *storagepacker.Bucket, len(existing)) + errs := make([]chan error, len(existing)) + for j := range existing { + results[j] = make(chan *storagepacker.Bucket, 1) + errs[j] = make(chan error, 1) + } // Use a wait group wg := &sync.WaitGroup{} @@ -357,20 +377,21 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error { for { select { - case key, ok := <-broker: + case idx, ok := <-broker: // broker has been closed, we are done if !ok { return } + key := existing[idx] bucket, err := i.entityPacker.GetBucket(ctx, storagepacker.StoragePackerBucketsPrefix+key) if err != nil { - errs <- err + errs[idx] <- err continue } // Write results out to the result channel - result <- bucket + results[idx] <- bucket // quit early case <-quit: @@ -384,17 +405,13 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - for j, key := range existing { - if j%500 == 0 { - i.logger.Debug("entities loading", "progress", j) - } - + for j := range existing { select { case <-quit: return default: - broker <- key + broker <- j } } @@ -404,14 +421,14 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error { // Restore each key by pulling from the result chan LOOP: - for j := 0; j < len(existing); j++ { + for j := range existing { select { - case err = <-errs: + case err = <-errs[j]: // Close all go routines close(quit) break LOOP - case bucket := <-result: + case bucket := <-results[j]: // If there is no entry, nothing to restore if bucket == nil { continue diff --git a/vault/identiy_store_test_stubs_oss.go b/vault/identiy_store_test_stubs_oss.go new file mode 100644 index 0000000000..e3a5703885 --- /dev/null +++ b/vault/identiy_store_test_stubs_oss.go @@ -0,0 +1,21 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !enterprise + +package vault + +import ( + "context" + "testing" +) + +//go:generate go run github.com/hashicorp/vault/tools/stubmaker + +func entIdentityStoreDeterminismTestSetup(t *testing.T, ctx context.Context, c *Core, upme, localme *MountEntry) { + // no op +} + +func entIdentityStoreDeterminismAssert(t *testing.T, i int, loadedIDs, lastIDs []string) { + // no op +} diff --git a/vault/testing.go b/vault/testing.go index 0360870323..a3d952a76d 100644 --- a/vault/testing.go +++ b/vault/testing.go @@ -31,7 +31,6 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/golang/protobuf/ptypes" "github.com/hashicorp/go-cleanhttp" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-secure-stdlib/reloadutil" @@ -60,6 +59,7 @@ import ( "github.com/mitchellh/copystructure" "golang.org/x/crypto/ed25519" "golang.org/x/net/http2" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/known/anypb" ) @@ -2215,17 +2215,29 @@ var ( _ testcluster.VaultClusterNode = &TestClusterCore{} ) +func TestUserpassMount(c *Core, local bool) (*MountEntry, error) { + name := "userpass" + if local { + name += "-local" + } + userpassMe := &MountEntry{ + Table: credentialTableType, + Path: name + "/", + Type: "userpass", + Description: name, + Accessor: name, + Local: local, + } + if err := c.enableCredential(namespace.RootContext(nil), userpassMe); err != nil { + return nil, err + } + return userpassMe, nil +} + // TestCreateDuplicateEntityAliasesInStorage creates n entities with a duplicate alias in storage // This should only be used in testing func TestCreateDuplicateEntityAliasesInStorage(ctx context.Context, c *Core, n int) ([]string, error) { - userpassMe := &MountEntry{ - Table: credentialTableType, - Path: "userpass/", - Type: "userpass", - Description: "userpass", - Accessor: "userpass1", - } - err := c.enableCredential(namespace.RootContext(nil), userpassMe) + userpassMe, err := TestUserpassMount(c, false) if err != nil { return nil, err } @@ -2250,16 +2262,7 @@ func TestCreateDuplicateEntityAliasesInStorage(ctx context.Context, c *Core, n i NamespaceID: namespace.RootNamespaceID, BucketKey: c.identityStore.entityPacker.BucketKey(entityID), } - - entity, err := ptypes.MarshalAny(e) - if err != nil { - return nil, err - } - item := &storagepacker.Item{ - ID: e.ID, - Message: entity, - } - if err = c.identityStore.entityPacker.PutItem(ctx, item); err != nil { + if err := TestHelperWriteToStoragePacker(ctx, c.identityStore.entityPacker, e.ID, e); err != nil { return nil, err } } @@ -2267,6 +2270,22 @@ func TestCreateDuplicateEntityAliasesInStorage(ctx context.Context, c *Core, n i return entityIDs, nil } +// TestHelperWriteToStoragePacker takes care of boiler place to insert into a +// storage packer. Just provide the raw protobuf object e.g. &identity.Entity{} +// and it is wrapped and inserted for you. You still need to populate BucketKey +// in the object if applicable before passing it. +func TestHelperWriteToStoragePacker(ctx context.Context, p *storagepacker.StoragePacker, id string, m protoreflect.ProtoMessage) error { + a, err := anypb.New(m) + if err != nil { + return err + } + i := &storagepacker.Item{ + ID: id, + Message: a, + } + return p.PutItem(context.Background(), i) +} + // TestCreateStorageGroup creates a group in storage only to bypass checks that the entities exist in memdb // Should only be used in testing func TestCreateStorageGroup(ctx context.Context, c *Core, entityIDs []string) error {