Skip to content

Commit

Permalink
Merge branch 'main' into docs/secrets-sync-gcp-correction
Browse files Browse the repository at this point in the history
  • Loading branch information
aphorise authored Jan 24, 2025
2 parents be9973c + 1bfe364 commit e87ecfd
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 58 deletions.
3 changes: 3 additions & 0 deletions changelog/29326.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core/identity: Improve performance of loading entities when unsealing by batching updates, caching local alias storage reads, and doing more work in parallel.
```
176 changes: 118 additions & 58 deletions vault/identity_store_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

metrics "github.com/armon/go-metrics"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/errwrap"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-secure-stdlib/strutil"
Expand All @@ -33,6 +34,7 @@ import (
var (
errCycleDetectedPrefix = "cyclic relationship detected for member group ID"
tmpSuffix = ".tmp"
entityLoadingTxMaxSize = 1024
)

// loadIdentityStoreArtifacts is responsible for loading entities, groups, and aliases from storage into MemDB.
Expand Down Expand Up @@ -393,10 +395,10 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
// create a slice of result channels, one for each bucket. We need each result
// and err chan to be 1 buffered so we can leave a result there even if the
// processing loop is blocking on an earlier bucket still.
results := make([]chan *storagepacker.Bucket, len(existing))
results := make([]chan []*identity.Entity, len(existing))
errs := make([]chan error, len(existing))
for j := range existing {
results[j] = make(chan *storagepacker.Bucket, 1)
results[j] = make(chan []*identity.Entity, 1)
errs[j] = make(chan error, 1)
}

Expand Down Expand Up @@ -424,8 +426,18 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
continue
}

items := make([]*identity.Entity, len(bucket.Items))
for j, item := range bucket.Items {
entity, err := i.parseEntityFromBucketItem(ctx, item)
if err != nil {
errs[idx] <- err
continue
}
items[j] = entity
}

// Write results out to the result channel
results[idx] <- bucket
results[idx] <- items

// quit early
case <-quit:
Expand Down Expand Up @@ -453,6 +465,8 @@ func (i *IdentityStore) loadEntities(ctx context.Context) error {
close(broker)
}()

localAliasBuckets := make(map[string]*storagepacker.Bucket)

// Restore each key by pulling from the result chan
LOOP:
for j := range existing {
Expand All @@ -462,77 +476,89 @@ LOOP:
close(quit)
break LOOP

case bucket := <-results[j]:
case entities := <-results[j]:
// If there is no entry, nothing to restore
if bucket == nil {
if entities == nil {
continue
}
load := func(entities []*identity.Entity) error {
tx := i.db.Txn(true)
defer tx.Abort()
upsertedItems := 0
for _, entity := range entities {
if entity == nil {
continue
}

for _, item := range bucket.Items {
entity, err := i.parseEntityFromBucketItem(ctx, item)
if err != nil {
return err
}
if entity == nil {
continue
}

ns, err := i.namespacer.NamespaceByID(ctx, entity.NamespaceID)
if err != nil {
return err
}
if ns == nil {
// Remove dangling entities
if !(i.localNode.ReplicationState().HasState(consts.ReplicationPerformanceSecondary) || i.localNode.HAState() == consts.PerfStandby) {
// Entity's namespace doesn't exist anymore but the
// entity from the namespace still exists.
i.logger.Warn("deleting entity and its any existing aliases", "name", entity.Name, "namespace_id", entity.NamespaceID)
err = i.entityPacker.DeleteItem(ctx, entity.ID)
if err != nil {
return err
ns, err := i.namespacer.NamespaceByID(ctx, entity.NamespaceID)
if err != nil {
return err
}
if ns == nil {
// Remove dangling entities
if !(i.localNode.ReplicationState().HasState(consts.ReplicationPerformanceSecondary) || i.localNode.HAState() == consts.PerfStandby) {
// Entity's namespace doesn't exist anymore but the
// entity from the namespace still exists.
i.logger.Warn("deleting entity and its any existing aliases", "name", entity.Name, "namespace_id", entity.NamespaceID)
err = i.entityPacker.DeleteItem(ctx, entity.ID)
if err != nil {
return err
}
}
continue
}
continue
}
nsCtx := namespace.ContextWithNamespace(ctx, ns)
nsCtx := namespace.ContextWithNamespace(ctx, ns)

// Ensure that there are no entities with duplicate names
entityByName, err := i.MemDBEntityByName(nsCtx, entity.Name, false)
if err != nil {
return nil
}
if err := i.conflictResolver.ResolveEntities(ctx, entityByName, entity); err != nil && !i.disableLowerCasedNames {
return err
}
// Ensure that there are no entities with duplicate names
entityByName, err := i.MemDBEntityByName(nsCtx, entity.Name, false)
if err != nil {
return nil
}
if err := i.conflictResolver.ResolveEntities(ctx, entityByName, entity); err != nil && !i.disableLowerCasedNames {
return err
}

mountAccessors := getAccessorsOnDuplicateAliases(entity.Aliases)
mountAccessors := getAccessorsOnDuplicateAliases(entity.Aliases)

if len(mountAccessors) > 0 {
i.logger.Warn("Entity has multiple aliases on the same mount(s)", "entity_id", entity.ID, "mount_accessors", mountAccessors)
}
if len(mountAccessors) > 0 {
i.logger.Warn("Entity has multiple aliases on the same mount(s)", "entity_id", entity.ID, "mount_accessors", mountAccessors)
}

for _, accessor := range mountAccessors {
if _, ok := duplicatedAccessors[accessor]; !ok {
duplicatedAccessors[accessor] = struct{}{}
for _, accessor := range mountAccessors {
if _, ok := duplicatedAccessors[accessor]; !ok {
duplicatedAccessors[accessor] = struct{}{}
}
}
}

localAliases, err := i.parseLocalAliases(entity.ID)
if err != nil {
return fmt.Errorf("failed to load local aliases from storage: %v", err)
}
if localAliases != nil {
for _, alias := range localAliases.Aliases {
entity.UpsertAlias(alias)
err = i.loadLocalAliasesForEntity(ctx, entity, localAliasBuckets)
if err != nil {
return fmt.Errorf("failed to load local aliases from storage: %v", err)
}
}

// Only update MemDB and don't hit the storage again
err = i.upsertEntity(nsCtx, entity, nil, false)
if err != nil {
return fmt.Errorf("failed to update entity in MemDB: %w", err)
toBeUpserted := 1 + len(entity.Aliases)
if upsertedItems+toBeUpserted > entityLoadingTxMaxSize {
tx.Commit()
upsertedItems = 0
tx = i.db.Txn(true)
defer tx.Abort()
}
// Only update MemDB and don't hit the storage again
err = i.upsertEntityInTxn(nsCtx, tx, entity, nil, false)
if err != nil {
return fmt.Errorf("failed to update entity in MemDB: %w", err)
}
upsertedItems += toBeUpserted
}
if upsertedItems > 0 {
tx.Commit()
}
return nil
}
err := load(entities)
if err != nil {
return err
}

}
}

Expand All @@ -557,6 +583,40 @@ LOOP:
return nil
}

// loadLocalAliasesForEntity upserts local aliases into the entity by retrieving
// the local aliases from the cache (if present) or storage
func (i *IdentityStore) loadLocalAliasesForEntity(ctx context.Context, entity *identity.Entity, localAliasCache map[string]*storagepacker.Bucket) error {
bucketKey := i.localAliasPacker.BucketKey(entity.ID)
if len(bucketKey) == 0 {
return fmt.Errorf("no bucket key for ID %s", entity.ID)
}
bucket, ok := localAliasCache[bucketKey]
if !ok {
var err error
bucket, err = i.localAliasPacker.GetBucket(ctx, bucketKey)
if err != nil {
return fmt.Errorf("failed to load local alias bucket from storage: %v", err)
}
localAliasCache[bucketKey] = bucket
}
if bucket == nil {
return nil
}
for _, item := range bucket.Items {
if item.ID == entity.ID {
var localAliases identity.LocalAliases
err := ptypes.UnmarshalAny(item.Message, &localAliases)
if err != nil {
return fmt.Errorf("failed to unmarshal local alias: %v", err)
}
for _, alias := range localAliases.Aliases {
entity.UpsertAlias(alias)
}
}
}
return nil
}

// getAccessorsOnDuplicateAliases returns a list of accessors by checking aliases in
// the passed in list which belong to the same accessor(s)
func getAccessorsOnDuplicateAliases(aliases []*identity.Alias) []string {
Expand Down

0 comments on commit e87ecfd

Please sign in to comment.