Improve IdentityStore Invalidate performance (#27184)

* improve identitystore invalidate performance

* add changelog

* adding test to cover invalidation of entity bucket keys within IdentityStore

* minor clean ups

* adding tests

* add missing godoc for tests
This commit is contained in:
Marc Boudreau
2024-05-24 13:48:40 -04:00
committed by GitHub
parent 53ec4d5f7b
commit d30917692a
4 changed files with 501 additions and 149 deletions

3
changelog/27184.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:change
core/identity: improve performance for secondary nodes receiving identity related updates through replication
```

View File

@@ -6,6 +6,7 @@ package vault
import (
"context"
"fmt"
"reflect"
"strings"
"time"
@@ -24,6 +25,7 @@ import (
"github.com/hashicorp/vault/sdk/helper/locksutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/patrickmn/go-cache"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
@@ -621,145 +623,164 @@ func (i *IdentityStore) Invalidate(ctx context.Context, key string) {
defer i.lock.Unlock()
switch {
// Check if the key is a storage entry key for an entity bucket
case strings.HasPrefix(key, storagepacker.StoragePackerBucketsPrefix):
// key is for a entity bucket in storage.
i.invalidateEntityBucket(ctx, key)
return
// Check if the key is a storage entry key for an group bucket
// For those entities that are deleted, clear up the local alias entries
case strings.HasPrefix(key, groupBucketsPrefix):
// key is for a group bucket in storage.
i.invalidateGroupBucket(ctx, key)
return
case strings.HasPrefix(key, oidcTokensPrefix):
// key is for oidc tokens in storage.
i.invalidateOIDCToken(ctx)
return
case strings.HasPrefix(key, clientPath):
// key is for a client in storage.
i.invalidateClientPath(ctx, key)
return
case strings.HasPrefix(key, localAliasesBucketsPrefix):
i.invalidateLocalAliasBucket(ctx, key)
return
// key is for a local alias bucket in storage.
i.invalidateLocalAliasesBucket(ctx, key)
}
}
// invalidateEntityBucket is called by the Invalidate function to handle the
// invalidation of an Entity bucket storage entry.
func (i *IdentityStore) invalidateEntityBucket(ctx context.Context, key string) {
// Create a MemDB transaction
txn := i.db.Txn(true)
defer txn.Abort()
// Each entity object in MemDB holds the MD5 hash of the storage
// entry key of the entity bucket. Fetch all the entities that
// belong to this bucket using the hash value. Remove these entities
// from MemDB along with all the aliases of each entity.
entitiesFetched, err := i.MemDBEntitiesByBucketKeyInTxn(txn, key)
// The handling of entities has the added quirk of dealing with a temporary
// copy of the entity written in storage on the active node of performance
// secondary clusters. These temporary entity entries in storage must be
// removed once the actual entity appears in the storage bucket (as
// replicated from the primary cluster).
//
// This function retrieves all entities from MemDB that have a corresponding
// storage key that matches the provided key to invalidate. This is the set
// of entities that need to be updated, removed, or left alone in MemDB.
//
// The logic iterates over every entity stored in the invalidated storage
// bucket. For each entity read from the storage bucket, the set of entities
// read from MemDB is searched for the same entity. If it can't be found,
// it means that it needs to be inserted into MemDB. On the other hand, if
// the entity is found, it the storage bucket entity is compared to the
// MemDB entity. If they do not match, then the storage entity state needs
// to be used to update the MemDB entity; if they did match, then it means
// that the MemDB entity can be left alone. As each MemDB entity is
// processed in the loop, it is removed from the set of MemDB entities.
//
// Once all entities from the storage bucket have been compared to those
// retrieved from MemDB, the remaining entities from the set retrieved from
// MemDB are those that have been deleted from storage and must be removed
// from MemDB (because as MemDB entities that matches a storage bucket
// entity were processed, they were removed from the set).
memDBEntities, err := i.MemDBEntitiesByBucketKeyInTxn(txn, key)
if err != nil {
i.logger.Error("failed to fetch entities using the bucket key", "key", key)
return
}
for _, entity := range entitiesFetched {
// Delete all the aliases in the entity. This function will also remove
// the corresponding alias indexes too.
err = i.deleteAliasesInEntityInTxn(txn, entity, entity.Aliases)
if err != nil {
i.logger.Error("failed to delete aliases in entity", "entity_id", entity.ID, "error", err)
return
}
// Delete the entity using the same transaction
err = i.MemDBDeleteEntityByIDInTxn(txn, entity.ID)
if err != nil {
i.logger.Error("failed to delete entity from MemDB", "entity_id", entity.ID, "error", err)
return
}
}
// Get the storage bucket entry
bucket, err := i.entityPacker.GetBucket(ctx, key)
if err != nil {
i.logger.Error("failed to refresh entities", "key", key, "error", err)
return
}
// If the underlying entry is nil, it means that this invalidation
// notification is for the deletion of the underlying storage entry. At
// this point, since all the entities belonging to this bucket are
// already removed, there is nothing else to be done. But, if the
// storage entry is non-nil, its an indication of an update. In this
// case, entities in the updated bucket needs to be reinserted into
// MemDB.
var entityIDs []string
if bucket != nil {
entityIDs = make([]string, 0, len(bucket.Items))
// The storage entry for the entity bucket exists, so we need to compare
// the entities in that bucket with those in MemDB and only update those
// that are different. The entities in the bucket storage entry are the
// source of truth.
// Iterate over each entity item from the bucket
for _, item := range bucket.Items {
entity, err := i.parseEntityFromBucketItem(ctx, item)
bucketEntity, err := i.parseEntityFromBucketItem(ctx, item)
if err != nil {
i.logger.Error("failed to parse entity from bucket entry item", "error", err)
return
}
localAliases, err := i.parseLocalAliases(entity.ID)
localAliases, err := i.parseLocalAliases(bucketEntity.ID)
if err != nil {
i.logger.Error("failed to load local aliases from storage", "error", err)
return
}
if localAliases != nil {
for _, alias := range localAliases.Aliases {
entity.UpsertAlias(alias)
bucketEntity.UpsertAlias(alias)
}
}
// Only update MemDB and don't touch the storage
err = i.upsertEntityInTxn(ctx, txn, entity, nil, false)
var memDBEntity *identity.Entity
for i, entity := range memDBEntities {
if entity.ID == bucketEntity.ID {
memDBEntity = entity
// Remove this processed entity from the slice, so that
// all tht will be left are unprocessed entities.
copy(memDBEntities[i:], memDBEntities[i+1:])
memDBEntities = memDBEntities[:len(memDBEntities)-1]
break
}
}
// If the entity is not in MemDB or if it is but differs from the
// state that's in the bucket storage entry, upsert it into MemDB.
// We've considered the use of github.com/google/go-cmp here,
// but opted for sticking with reflect.DeepEqual because go-cmp
// is intended for testing and is able to panic in some
// situations.
if memDBEntity == nil || !reflect.DeepEqual(memDBEntity, bucketEntity) {
// The entity is not in MemDB, it's a new entity. Add it to MemDB.
err = i.upsertEntityInTxn(ctx, txn, bucketEntity, nil, false)
if err != nil {
i.logger.Error("failed to update entity in MemDB", "error", err)
i.logger.Error("failed to update entity in MemDB", "entity_id", bucketEntity.ID, "error", err)
return
}
// If we are a secondary, the entity created by the secondary
// via the CreateEntity RPC would have been cached. Now that the
// invalidation of the same has hit, there is no need of the
// cache. Clearing the cache. Writing to storage can't be
// performed by perf standbys. So only doing this in the active
// node of the secondary.
// If this is a performance secondary, the entity created on
// this node would have been cached in a local cache based on
// the result of the CreateEntity RPC call to the primary
// cluster. Since this invalidation is signaling that the
// entity is now in the primary cluster's storage, the locally
// cached entry can be removed.
if i.localNode.ReplicationState().HasState(consts.ReplicationPerformanceSecondary) && i.localNode.HAState() == consts.Active {
if err := i.localAliasPacker.DeleteItem(ctx, entity.ID+tmpSuffix); err != nil {
i.logger.Error("failed to clear local alias entity cache", "error", err, "entity_id", entity.ID)
if err := i.localAliasPacker.DeleteItem(ctx, bucketEntity.ID+tmpSuffix); err != nil {
i.logger.Error("failed to clear local alias entity cache", "error", err, "entity_id", bucketEntity.ID)
return
}
}
entityIDs = append(entityIDs, entity.ID)
}
}
}
// entitiesFetched are the entities before invalidation. entityIDs
// represent entities that are valid after invalidation. Clear the
// storage entries of local aliases for those entities that are
// indicated deleted by this invalidation.
// Any entities that are still in the memDBEntities slice are ones that do
// not exist in the bucket storage entry. These entities have to be removed
// from MemDB.
for _, memDBEntity := range memDBEntities {
err = i.deleteAliasesInEntityInTxn(txn, memDBEntity, memDBEntity.Aliases)
if err != nil {
i.logger.Error("failed to delete aliases in entity", "entity_id", memDBEntity.ID, "error", err)
return
}
err = i.MemDBDeleteEntityByIDInTxn(txn, memDBEntity.ID)
if err != nil {
i.logger.Error("failed to delete entity from MemDB", "entity_id", memDBEntity.ID, "error", err)
return
}
// In addition, if this is an active node of a performance secondary
// cluster, remove the local alias storage entry for this deleted entity.
if i.localNode.ReplicationState().HasState(consts.ReplicationPerformanceSecondary) && i.localNode.HAState() == consts.Active {
for _, entity := range entitiesFetched {
if !strutil.StrListContains(entityIDs, entity.ID) {
if err := i.localAliasPacker.DeleteItem(ctx, entity.ID); err != nil {
i.logger.Error("failed to clear local alias for entity", "error", err, "entity_id", entity.ID)
if err := i.localAliasPacker.DeleteItem(ctx, memDBEntity.ID); err != nil {
i.logger.Error("failed to clear local alias for entity", "error", err, "entity_id", memDBEntity.ID)
return
}
}
}
}
txn.Commit()
}
// invalidateGroupBucket is called by the Invalidate function to handle the
// invalidation of a Group bucket storage entry.
func (i *IdentityStore) invalidateGroupBucket(ctx context.Context, key string) {
// Create a MemDB transaction
txn := i.db.Txn(true)
@@ -846,6 +867,7 @@ func (i *IdentityStore) invalidateOIDCToken(ctx context.Context) {
// the shared namespace as well.
if err := i.oidcCache.Flush(ns); err != nil {
i.logger.Error("error flushing oidc cache", "error", err)
return
}
}
@@ -863,108 +885,190 @@ func (i *IdentityStore) invalidateClientPath(ctx context.Context, key string) {
// invalidateLocalAliasBucket is called by the Invalidate function to handle the
// invalidation of a local alias bucket storage entry.
func (i *IdentityStore) invalidateLocalAliasBucket(ctx context.Context, key string) {
//
// This invalidation only happens on perf standbys
//
func (i *IdentityStore) invalidateLocalAliasesBucket(ctx context.Context, key string) {
// This invalidation only happens on performance standby servers
// Create a MemDB transaction and abort it once this function returns
txn := i.db.Txn(true)
defer txn.Abort()
// Find all the local aliases belonging to this bucket and remove it
// both from aliases table and entities table. We will add the local
// aliases back by parsing the storage key. This way the deletion
// invalidation gets handled.
aliases, err := i.MemDBLocalAliasesByBucketKeyInTxn(txn, key)
// Local aliases have the added complexity of being associated with
// entities. Whenever a local alias is updated or inserted into MemDB, its
// associated MemDB-stored entity must also be updated.
//
// This function retrieves all local aliases that have a corresponding
// storage key that matches the provided key to invalidate. This is the
// set of local aliases that need to be updated, removed, or left
// alone in MemDB. Each of these operations is done as its own MemDB
// operation, but the corresponding changes that need to be made to the
// associated entities can be batched together to cut down on the number of
// MemDB operations.
//
// The logic iterates over every local alias stored at the invalidated key.
// For each local alias read from the storage entry, the set of local
// aliases read from MemDB is searched for the same local alias. If it can't
// be found, it means that it needs to be inserted into MemDB. However, if
// it's found, it must be compared with the local alias from the storage. If
// they don't match, it means that the local alias in MemDB needs to be
// updated. If they did match, it means that this particular local alias did
// not change in storage, so nothing further needs to be done. Each local
// alias processed in this loop is removed from the set of retrieved local
// aliases. The local alias is also added to the map tracking local aliases
// that need to be upserted in their associated entities in MemDB.
//
// Once the code is done iterating over all of the local aliases from
// storage, any local aliases still in the set retrieved from MemDB
// corresponds to a local alias that is no longer in storage and must be
// removed from MemDB. These local aliases are added to the map tracking
// local aliases to remove from their entities in MemDB. The actual removal
// of the local aliases themselves is done as part of the tidying up of the
// associated entities, described below.
//
// In order to batch the changes to the associated entities, a map of entity
// to local aliases (slice of local alias) is built up in the loop that
// iterates over the local aliases from storage. Similarly, the code that
// detects which local aliases to remove from MemDB also builds a separate
// map of entity to local aliases (slice of local alias). Each element in
// the map of local aliases to update in their entity is processed as
// follows: the mapped slice of local aliases is iterated over and each
// local alias is upserted into the entity and then the entity itself is
// upserted. Then, each element in the map of local aliases to remove from
// their entity is processed as follows: the
// Get all cached local aliases to compare with invalidated bucket
memDBLocalAliases, err := i.MemDBLocalAliasesByBucketKeyInTxn(txn, key)
if err != nil {
i.logger.Error("failed to fetch entities using the bucket key", "key", key)
i.logger.Error("failed to fetch local aliases using the bucket key", "key", key, "error", err)
return
}
for _, alias := range aliases {
entity, err := i.MemDBEntityByIDInTxn(txn, alias.CanonicalID, true)
if err != nil {
i.logger.Error("failed to fetch entity during local alias invalidation", "entity_id", alias.CanonicalID, "error", err)
return
}
if entity == nil {
i.logger.Error("failed to fetch entity during local alias invalidation, missing entity", "entity_id", alias.CanonicalID, "error", err)
continue
}
// Delete local aliases from the entity.
err = i.deleteAliasesInEntityInTxn(txn, entity, []*identity.Alias{alias})
if err != nil {
i.logger.Error("failed to delete aliases in entity", "entity_id", entity.ID, "error", err)
return
}
// Update the entity with removed alias.
if err := i.MemDBUpsertEntityInTxn(txn, entity); err != nil {
i.logger.Error("failed to delete entity from MemDB", "entity_id", entity.ID, "error", err)
return
}
}
// Now read the invalidated storage key
// Get local aliases from the invalidated bucket
bucket, err := i.localAliasPacker.GetBucket(ctx, key)
if err != nil {
i.logger.Error("failed to refresh local aliases", "key", key, "error", err)
return
}
// This map tracks the set of local aliases that need to be updated in each
// affected entity in MemDB.
entityLocalAliasesToUpsert := map[*identity.Entity][]*identity.Alias{}
// This map tracks the set of local aliases that need to be removed from
// their affected entity in MemDB, as well as removing the local alias
// themselves.
entityLocalAliasesToRemove := map[*identity.Entity][]*identity.Alias{}
if bucket != nil {
// The storage entry for the local alias bucket exists, so we need to
// compare the local aliases in that bucket with those in MemDB and only
// update those that are different. The local aliases in the bucket are
// the source of truth.
// Iterate over each local alias item from the bucket
for _, item := range bucket.Items {
if strings.HasSuffix(item.ID, tmpSuffix) {
continue
}
var localAliases identity.LocalAliases
err = ptypes.UnmarshalAny(item.Message, &localAliases)
var bucketLocalAliases identity.LocalAliases
err = anypb.UnmarshalTo(item.Message, &bucketLocalAliases, proto.UnmarshalOptions{})
if err != nil {
i.logger.Error("failed to parse local aliases during invalidation", "error", err)
return
}
for _, alias := range localAliases.Aliases {
// Add to the aliases table
if err := i.MemDBUpsertAliasInTxn(txn, alias, false); err != nil {
i.logger.Error("failed to insert local alias to memdb during invalidation", "error", err)
i.logger.Error("failed to parse local aliases during invalidation", "item_id", item.ID, "error", err)
return
}
// Fetch the associated entity and add the alias to that too.
entity, err := i.MemDBEntityByIDInTxn(txn, alias.CanonicalID, false)
if err != nil {
i.logger.Error("failed to fetch entity during local alias invalidation", "error", err)
for _, bucketLocalAlias := range bucketLocalAliases.Aliases {
// Find the entity related to bucketLocalAlias in MemDB in order
// to track any local aliases modifications that must be made in
// this entity.
memDBEntity := i.FetchEntityForLocalAliasInTxn(txn, bucketLocalAlias)
if memDBEntity == nil {
// FetchEntityForLocalAliasInTxn already logs any error
return
}
if entity == nil {
cachedEntityItem, err := i.localAliasPacker.GetItem(alias.CanonicalID + tmpSuffix)
if err != nil {
i.logger.Error("failed to fetch cached entity", "key", key, "error", err)
return
}
if cachedEntityItem != nil {
entity, err = i.parseCachedEntity(cachedEntityItem)
if err != nil {
i.logger.Error("failed to parse cached entity", "key", key, "error", err)
return
}
}
}
if entity == nil {
i.logger.Error("received local alias invalidation for an invalid entity", "item.ID", item.ID)
return
}
entity.UpsertAlias(alias)
// Update the entities table
if err := i.MemDBUpsertEntityInTxn(txn, entity); err != nil {
i.logger.Error("failed to upsert entity during local alias invalidation", "error", err)
// memDBLocalAlias starts off nil but gets set to the local
// alias from memDBLocalAliases whose ID matches the ID of
// bucketLocalAlias.
var memDBLocalAlias *identity.Alias
for i, localAlias := range memDBLocalAliases {
if localAlias.ID == bucketLocalAlias.ID {
memDBLocalAlias = localAlias
// Remove this processed local alias from the
// memDBLocalAliases slice, so that all that
// will be left are unprocessed local aliases.
copy(memDBLocalAliases[i:], memDBLocalAliases[i+1:])
memDBLocalAliases = memDBLocalAliases[:len(memDBLocalAliases)-1]
break
}
}
// We've considered the use of github.com/google/go-cmp here,
// but opted for sticking with reflect.DeepEqual because go-cmp
// is intended for testing and is able to panic in some
// situations.
if memDBLocalAlias == nil || !reflect.DeepEqual(memDBLocalAlias, bucketLocalAlias) {
// The bucketLocalAlias is not in MemDB or it has changed in
// storage.
err = i.MemDBUpsertAliasInTxn(txn, bucketLocalAlias, false)
if err != nil {
i.logger.Error("failed to update local alias in MemDB", "alias_id", bucketLocalAlias.ID, "error", err)
return
}
// Add this local alias to the set of local aliases that
// need to be updated for memDBEntity.
entityLocalAliasesToUpsert[memDBEntity] = append(entityLocalAliasesToUpsert[memDBEntity], bucketLocalAlias)
}
}
}
}
// Any local aliases still remaining in memDBLocalAliases do not exist in
// storage and should be removed from MemDB.
for _, memDBLocalAlias := range memDBLocalAliases {
memDBEntity := i.FetchEntityForLocalAliasInTxn(txn, memDBLocalAlias)
if memDBEntity == nil {
// FetchEntityForLocalAliasInTxn already logs any error
return
}
entityLocalAliasesToRemove[memDBEntity] = append(entityLocalAliasesToRemove[memDBEntity], memDBLocalAlias)
}
// Now process the entityLocalAliasesToUpsert map.
for entity, localAliases := range entityLocalAliasesToUpsert {
for _, localAlias := range localAliases {
entity.UpsertAlias(localAlias)
}
err = i.MemDBUpsertEntityInTxn(txn, entity)
if err != nil {
i.logger.Error("failed to update entity in MemDB", "entity_id", entity.ID, "error", err)
return
}
}
// Finally process the entityLocalAliasesToRemove map.
for entity, localAliases := range entityLocalAliasesToRemove {
// The deleteAliasesInEntityInTxn removes the provided aliases from
// the entity, but it also removes the aliases themselves from MemDB.
err := i.deleteAliasesInEntityInTxn(txn, entity, localAliases)
if err != nil {
i.logger.Error("failed to delete aliases in entity", "entity_id", entity.ID, "error", err)
return
}
err = i.MemDBUpsertEntityInTxn(txn, entity)
if err != nil {
i.logger.Error("failed to update entity in MemDB", "entity_id", entity.ID, "error", err)
return
}
}
txn.Commit()
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/storagepacker"
"github.com/hashicorp/vault/sdk/logical"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/anypb"
)
@@ -912,3 +913,217 @@ func TestIdentityStore_DeleteCaseSensitivityKey(t *testing.T) {
t.Fatalf("bad: expected no entry for casesensitivity key")
}
}
// TestIdentityStoreInvalidate_Entities verifies the proper handling of
// entities in the Invalidate method.
func TestIdentityStoreInvalidate_Entities(t *testing.T) {
c, _, _ := TestCoreUnsealed(t)
// Create an entity in storage then call the Invalidate function
//
id, err := uuid.GenerateUUID()
require.NoError(t, err)
entity := &identity.Entity{
Name: "test",
NamespaceID: namespace.RootNamespaceID,
ID: id,
Aliases: []*identity.Alias{},
BucketKey: c.identityStore.entityPacker.BucketKey(id),
}
p := c.identityStore.entityPacker
// Persist the entity which we are merging to
entityAsAny, err := anypb.New(entity)
require.NoError(t, err)
item := &storagepacker.Item{
ID: id,
Message: entityAsAny,
}
err = p.PutItem(context.Background(), item)
require.NoError(t, err)
c.identityStore.Invalidate(context.Background(), p.BucketKey(id))
txn := c.identityStore.db.Txn(true)
memEntity, err := c.identityStore.MemDBEntityByIDInTxn(txn, id, true)
assert.NoError(t, err)
assert.NotNil(t, memEntity)
txn.Commit()
// Modify the entity in storage then call the Invalidate function
entity.Metadata = make(map[string]string)
entity.Metadata["foo"] = "bar"
entityAsAny, err = anypb.New(entity)
require.NoError(t, err)
item.Message = entityAsAny
p.PutItem(context.Background(), item)
c.identityStore.Invalidate(context.Background(), p.BucketKey(id))
txn = c.identityStore.db.Txn(true)
memEntity, err = c.identityStore.MemDBEntityByIDInTxn(txn, id, true)
assert.NoError(t, err)
assert.Contains(t, memEntity.Metadata, "foo")
txn.Commit()
// Delete the entity in storage then call the Invalidate function
err = p.DeleteItem(context.Background(), id)
require.NoError(t, err)
c.identityStore.Invalidate(context.Background(), p.BucketKey(id))
txn = c.identityStore.db.Txn(true)
memEntity, err = c.identityStore.MemDBEntityByIDInTxn(txn, id, true)
assert.NoError(t, err)
assert.Nil(t, memEntity)
txn.Commit()
}
// TestIdentityStoreInvalidate_LocalAliasesWithEntity verifies the correct
// handling of local aliases in the Invalidate method.
func TestIdentityStoreInvalidate_LocalAliasesWithEntity(t *testing.T) {
c, _, _ := TestCoreUnsealed(t)
// Create an entity in storage then call the Invalidate function
//
entityID, err := uuid.GenerateUUID()
require.NoError(t, err)
entity := &identity.Entity{
Name: "test",
NamespaceID: namespace.RootNamespaceID,
ID: entityID,
Aliases: []*identity.Alias{},
BucketKey: c.identityStore.entityPacker.BucketKey(entityID),
}
aliasID, err := uuid.GenerateUUID()
require.NoError(t, err)
localAliases := &identity.LocalAliases{
Aliases: []*identity.Alias{
{
ID: aliasID,
Name: "test",
NamespaceID: namespace.RootNamespaceID,
CanonicalID: entityID,
MountAccessor: "userpass-000000",
},
},
}
ep := c.identityStore.entityPacker
// Persist the entity which we are merging to
entityAsAny, err := anypb.New(entity)
require.NoError(t, err)
entityItem := &storagepacker.Item{
ID: entityID,
Message: entityAsAny,
}
err = ep.PutItem(context.Background(), entityItem)
require.NoError(t, err)
c.identityStore.Invalidate(context.Background(), ep.BucketKey(entityID))
lap := c.identityStore.localAliasPacker
localAliasesAsAny, err := anypb.New(localAliases)
require.NoError(t, err)
localAliasesItem := &storagepacker.Item{
ID: entityID,
Message: localAliasesAsAny,
}
err = lap.PutItem(context.Background(), localAliasesItem)
require.NoError(t, err)
c.identityStore.Invalidate(context.Background(), lap.BucketKey(entityID))
txn := c.identityStore.db.Txn(true)
memDBEntity, err := c.identityStore.MemDBEntityByIDInTxn(txn, entityID, true)
assert.NoError(t, err)
assert.NotNil(t, memDBEntity)
memDBLocalAlias, err := c.identityStore.MemDBAliasByIDInTxn(txn, aliasID, true, false)
assert.NoError(t, err)
assert.NotNil(t, memDBLocalAlias)
assert.Equal(t, 1, len(memDBEntity.Aliases))
assert.NotNil(t, memDBEntity.Aliases[0])
assert.Equal(t, memDBEntity.Aliases[0].ID, memDBLocalAlias.ID)
txn.Commit()
}
// TestIdentityStoreInvalidate_TemporaryEntity verifies the proper handling of
// temporary entities in the Invalidate method.
func TestIdentityStoreInvalidate_TemporaryEntity(t *testing.T) {
c, _, _ := TestCoreUnsealed(t)
// Create an entity in storage then call the Invalidate function
//
entityID, err := uuid.GenerateUUID()
require.NoError(t, err)
tempEntity := &identity.Entity{
Name: "test",
NamespaceID: namespace.RootNamespaceID,
ID: entityID,
Aliases: []*identity.Alias{},
BucketKey: c.identityStore.entityPacker.BucketKey(entityID),
}
lap := c.identityStore.localAliasPacker
ep := c.identityStore.entityPacker
// Persist the entity which we are merging to
tempEntityAsAny, err := anypb.New(tempEntity)
require.NoError(t, err)
tempEntityItem := &storagepacker.Item{
ID: entityID + tmpSuffix,
Message: tempEntityAsAny,
}
err = lap.PutItem(context.Background(), tempEntityItem)
require.NoError(t, err)
entityAsAny := tempEntityAsAny
entityItem := &storagepacker.Item{
ID: entityID,
Message: entityAsAny,
}
err = ep.PutItem(context.Background(), entityItem)
require.NoError(t, err)
c.identityStore.Invalidate(context.Background(), ep.BucketKey(entityID))
txn := c.identityStore.db.Txn(true)
memDBEntity, err := c.identityStore.MemDBEntityByIDInTxn(txn, entityID, true)
assert.NoError(t, err)
assert.NotNil(t, memDBEntity)
item, err := lap.GetItem(lap.BucketKey(entityID) + tmpSuffix)
assert.NoError(t, err)
assert.Nil(t, item)
}

View File

@@ -1269,6 +1269,36 @@ func (i *IdentityStore) MemDBDeleteEntityByID(entityID string) error {
return nil
}
// FetchEntityForLocalAliasInTxn fetches the entity associated with the provided
// local identity.Alias. MemDB will first be searched for the entity. If it is
// not found there, the localAliasPacker storagepacker.StoragePacker will be
// used. If an error occurs, an appropriate error message is logged and nil is
// returned.
func (i *IdentityStore) FetchEntityForLocalAliasInTxn(txn *memdb.Txn, alias *identity.Alias) *identity.Entity {
entity, err := i.MemDBEntityByIDInTxn(txn, alias.CanonicalID, false)
if err != nil {
i.logger.Error("failed to fetch entity from local alias", "entity_id", alias.CanonicalID, "error", err)
return nil
}
if entity == nil {
cachedEntityItem, err := i.localAliasPacker.GetItem(alias.CanonicalID + tmpSuffix)
if err != nil {
i.logger.Error("failed to fetch cached entity from local alias", "key", alias.CanonicalID+tmpSuffix, "error", err)
return nil
}
if cachedEntityItem != nil {
entity, err = i.parseCachedEntity(cachedEntityItem)
if err != nil {
i.logger.Error("failed to parse cached entity", "key", alias.CanonicalID+tmpSuffix, "error", err)
return nil
}
}
}
return entity
}
func (i *IdentityStore) MemDBDeleteEntityByIDInTxn(txn *memdb.Txn, entityID string) error {
if entityID == "" {
return nil