mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-30 02:02:43 +00:00
VAULT-31907: Entity loading speedup (#29326)
* perf improvements for loading entities in unseal * lint * changelog * abort on error * update to defer
This commit is contained in:
3
changelog/29326.txt
Normal file
3
changelog/29326.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:improvement
|
||||
core/identity: Improve performance of loading entities when unsealing by batching updates, caching local alias storage reads, and doing more work in parallel.
|
||||
```
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/hashicorp/errwrap"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-secure-stdlib/strutil"
|
||||
@@ -33,6 +34,7 @@ import (
|
||||
var (
|
||||
errCycleDetectedPrefix = "cyclic relationship detected for member group ID"
|
||||
tmpSuffix = ".tmp"
|
||||
entityLoadingTxMaxSize = 1024
|
||||
)
|
||||
|
||||
// loadIdentityStoreArtifacts is responsible for loading entities, groups, and aliases from storage into MemDB.
|
||||
@@ -393,10 +395,10 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
|
||||
// create a slice of result channels, one for each bucket. We need each result
|
||||
// and err chan to be 1 buffered so we can leave a result there even if the
|
||||
// processing loop is blocking on an earlier bucket still.
|
||||
results := make([]chan *storagepacker.Bucket, len(existing))
|
||||
results := make([]chan []*identity.Entity, len(existing))
|
||||
errs := make([]chan error, len(existing))
|
||||
for j := range existing {
|
||||
results[j] = make(chan *storagepacker.Bucket, 1)
|
||||
results[j] = make(chan []*identity.Entity, 1)
|
||||
errs[j] = make(chan error, 1)
|
||||
}
|
||||
|
||||
@@ -424,8 +426,18 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
items := make([]*identity.Entity, len(bucket.Items))
|
||||
for j, item := range bucket.Items {
|
||||
entity, err := i.parseEntityFromBucketItem(ctx, item)
|
||||
if err != nil {
|
||||
errs[idx] <- err
|
||||
continue
|
||||
}
|
||||
items[j] = entity
|
||||
}
|
||||
|
||||
// Write results out to the result channel
|
||||
results[idx] <- bucket
|
||||
results[idx] <- items
|
||||
|
||||
// quit early
|
||||
case <-quit:
|
||||
@@ -453,6 +465,8 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
|
||||
close(broker)
|
||||
}()
|
||||
|
||||
localAliasBuckets := make(map[string]*storagepacker.Bucket)
|
||||
|
||||
// Restore each key by pulling from the result chan
|
||||
LOOP:
|
||||
for j := range existing {
|
||||
@@ -462,77 +476,89 @@ LOOP:
|
||||
close(quit)
|
||||
break LOOP
|
||||
|
||||
case bucket := <-results[j]:
|
||||
case entities := <-results[j]:
|
||||
// If there is no entry, nothing to restore
|
||||
if bucket == nil {
|
||||
if entities == nil {
|
||||
continue
|
||||
}
|
||||
load := func(entities []*identity.Entity) error {
|
||||
tx := i.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
upsertedItems := 0
|
||||
for _, entity := range entities {
|
||||
if entity == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, item := range bucket.Items {
|
||||
entity, err := i.parseEntityFromBucketItem(ctx, item)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if entity == nil {
|
||||
continue
|
||||
}
|
||||
ns, err := i.namespacer.NamespaceByID(ctx, entity.NamespaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ns == nil {
|
||||
// Remove dangling entities
|
||||
if !(i.localNode.ReplicationState().HasState(consts.ReplicationPerformanceSecondary) || i.localNode.HAState() == consts.PerfStandby) {
|
||||
// Entity's namespace doesn't exist anymore but the
|
||||
// entity from the namespace still exists.
|
||||
i.logger.Warn("deleting entity and its any existing aliases", "name", entity.Name, "namespace_id", entity.NamespaceID)
|
||||
err = i.entityPacker.DeleteItem(ctx, entity.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
nsCtx := namespace.ContextWithNamespace(ctx, ns)
|
||||
|
||||
ns, err := i.namespacer.NamespaceByID(ctx, entity.NamespaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ns == nil {
|
||||
// Remove dangling entities
|
||||
if !(i.localNode.ReplicationState().HasState(consts.ReplicationPerformanceSecondary) || i.localNode.HAState() == consts.PerfStandby) {
|
||||
// Entity's namespace doesn't exist anymore but the
|
||||
// entity from the namespace still exists.
|
||||
i.logger.Warn("deleting entity and its any existing aliases", "name", entity.Name, "namespace_id", entity.NamespaceID)
|
||||
err = i.entityPacker.DeleteItem(ctx, entity.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
// Ensure that there are no entities with duplicate names
|
||||
entityByName, err := i.MemDBEntityByName(nsCtx, entity.Name, false)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if err := i.conflictResolver.ResolveEntities(ctx, entityByName, entity); err != nil && !i.disableLowerCasedNames {
|
||||
return err
|
||||
}
|
||||
|
||||
mountAccessors := getAccessorsOnDuplicateAliases(entity.Aliases)
|
||||
|
||||
if len(mountAccessors) > 0 {
|
||||
i.logger.Warn("Entity has multiple aliases on the same mount(s)", "entity_id", entity.ID, "mount_accessors", mountAccessors)
|
||||
}
|
||||
|
||||
for _, accessor := range mountAccessors {
|
||||
if _, ok := duplicatedAccessors[accessor]; !ok {
|
||||
duplicatedAccessors[accessor] = struct{}{}
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
nsCtx := namespace.ContextWithNamespace(ctx, ns)
|
||||
|
||||
// Ensure that there are no entities with duplicate names
|
||||
entityByName, err := i.MemDBEntityByName(nsCtx, entity.Name, false)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if err := i.conflictResolver.ResolveEntities(ctx, entityByName, entity); err != nil && !i.disableLowerCasedNames {
|
||||
return err
|
||||
}
|
||||
|
||||
mountAccessors := getAccessorsOnDuplicateAliases(entity.Aliases)
|
||||
|
||||
if len(mountAccessors) > 0 {
|
||||
i.logger.Warn("Entity has multiple aliases on the same mount(s)", "entity_id", entity.ID, "mount_accessors", mountAccessors)
|
||||
}
|
||||
|
||||
for _, accessor := range mountAccessors {
|
||||
if _, ok := duplicatedAccessors[accessor]; !ok {
|
||||
duplicatedAccessors[accessor] = struct{}{}
|
||||
err = i.loadLocalAliasesForEntity(ctx, entity, localAliasBuckets)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load local aliases from storage: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
localAliases, err := i.parseLocalAliases(entity.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load local aliases from storage: %v", err)
|
||||
}
|
||||
if localAliases != nil {
|
||||
for _, alias := range localAliases.Aliases {
|
||||
entity.UpsertAlias(alias)
|
||||
toBeUpserted := 1 + len(entity.Aliases)
|
||||
if upsertedItems+toBeUpserted > entityLoadingTxMaxSize {
|
||||
tx.Commit()
|
||||
upsertedItems = 0
|
||||
tx = i.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
}
|
||||
// Only update MemDB and don't hit the storage again
|
||||
err = i.upsertEntityInTxn(nsCtx, tx, entity, nil, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update entity in MemDB: %w", err)
|
||||
}
|
||||
upsertedItems += toBeUpserted
|
||||
}
|
||||
|
||||
// Only update MemDB and don't hit the storage again
|
||||
err = i.upsertEntity(nsCtx, entity, nil, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update entity in MemDB: %w", err)
|
||||
if upsertedItems > 0 {
|
||||
tx.Commit()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
err := load(entities)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -557,6 +583,40 @@ LOOP:
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadLocalAliasesForEntity upserts local aliases into the entity by retrieving
|
||||
// the local aliases from the cache (if present) or storage
|
||||
func (i *IdentityStore) loadLocalAliasesForEntity(ctx context.Context, entity *identity.Entity, localAliasCache map[string]*storagepacker.Bucket) error {
|
||||
bucketKey := i.localAliasPacker.BucketKey(entity.ID)
|
||||
if len(bucketKey) == 0 {
|
||||
return fmt.Errorf("no bucket key for ID %s", entity.ID)
|
||||
}
|
||||
bucket, ok := localAliasCache[bucketKey]
|
||||
if !ok {
|
||||
var err error
|
||||
bucket, err = i.localAliasPacker.GetBucket(ctx, bucketKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load local alias bucket from storage: %v", err)
|
||||
}
|
||||
localAliasCache[bucketKey] = bucket
|
||||
}
|
||||
if bucket == nil {
|
||||
return nil
|
||||
}
|
||||
for _, item := range bucket.Items {
|
||||
if item.ID == entity.ID {
|
||||
var localAliases identity.LocalAliases
|
||||
err := ptypes.UnmarshalAny(item.Message, &localAliases)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal local alias: %v", err)
|
||||
}
|
||||
for _, alias := range localAliases.Aliases {
|
||||
entity.UpsertAlias(alias)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getAccessorsOnDuplicateAliases returns a list of accessors by checking aliases in
|
||||
// the passed in list which belong to the same accessor(s)
|
||||
func getAccessorsOnDuplicateAliases(aliases []*identity.Alias) []string {
|
||||
|
||||
Reference in New Issue
Block a user