From 537fc0f3eacdce6f291bb4def847880339ebc260 Mon Sep 17 00:00:00 2001 From: divyaac Date: Tue, 10 Dec 2024 11:54:07 -0800 Subject: [PATCH] Send Global Data From Secondary to Primary During Upgrade (#29137) * OSS Patch OSS Patch Fixing a build issue * Revert "OSS Patch" This reverts commit 2cce608b9e7ad7df64cb10f91208c142e6825c57. * OSS-Patch * Fix test issue --- vault/activity_log.go | 314 +++++++++------ vault/activity_log_test.go | 532 +++++++++++++------------ vault/activity_log_testing_util.go | 213 +++++++--- vault/activity_log_util.go | 12 + vault/activity_log_util_common.go | 69 +++- vault/activity_log_util_common_test.go | 16 + 6 files changed, 708 insertions(+), 448 deletions(-) diff --git a/vault/activity_log.go b/vault/activity_log.go index 1a9b23f403..90ebffdd38 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -36,18 +36,20 @@ import ( const ( // activitySubPath is the directory under the system view where // the log will be stored. - activitySubPath = "counters/activity/" - activityEntityBasePath = "log/entity/" - activityTokenBasePath = "log/directtokens/" - activityTokenLocalBasePath = "local/" + activityTokenBasePath - activityQueryBasePath = "queries/" - activityConfigKey = "config" - activityIntentLogKey = "endofmonth" - activityGlobalPathPrefix = "global/" - activityLocalPathPrefix = "local/" + activitySubPath = "counters/activity/" + activityEntityBasePath = "log/entity/" + activityTokenBasePath = "log/directtokens/" + activityTokenLocalBasePath = "local/" + activityTokenBasePath + activityQueryBasePath = "queries/" + activityConfigKey = "config" + activityIntentLogKey = "endofmonth" + activityGlobalPathPrefix = "global/" + activityLocalPathPrefix = "local/" + activitySecondaryTempDataPathPrefix = "secondary/" activityACMERegenerationKey = "acme-regeneration" activityDeduplicationUpgradeKey = "deduplication-upgrade" + activitySecondaryDataRecCount = "secondary-data-received" // sketch for each month that stores hash of client ids distinctClientsBasePath = "log/distinctclients/" @@ -202,8 +204,6 @@ type ActivityLog struct { // Channel to signal global clients have received by the primary from the secondary, during upgrade to 1.19 dedupUpgradeGlobalClientsReceivedCh chan struct{} - // track whether the current cluster is in the middle of an upgrade to 1.19 - dedupClientsUpgradeComplete *atomic.Bool // track metadata and contents of the most recent log segment currentSegment segmentInfo @@ -237,9 +237,6 @@ type ActivityLog struct { // This channel is relevant for upgrades to 1.17. It indicates whether precomputed queries have been // generated for ACME clients. computationWorkerDone chan struct{} - // This channel is relevant for upgrades to 1.19+ (version with deduplication of clients) - // This indicates that paths that were used before 1.19 to store clients have been cleaned - oldStoragePathsCleaned chan struct{} // channel to indicate that a global clients have been // sent to the primary from a secondary @@ -256,6 +253,9 @@ type ActivityLog struct { globalPartialMonthClientTracker map[string]*activity.EntityRecord inprocessExport *atomic.Bool + // RetryUntilFalse is a test only attribute that allows us to run the sendPreviousMonthGlobalClientsWorker + // for as long as the test wants + RetryUntilFalse *atomic.Bool // clock is used to support manipulating time in unit and integration tests clock timeutil.Clock @@ -427,8 +427,8 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0), secondaryGlobalClientFragments: make([]*activity.LogFragment, 0), inprocessExport: atomic.NewBool(false), + RetryUntilFalse: atomic.NewBool(false), precomputedQueryWritten: make(chan struct{}), - dedupClientsUpgradeComplete: atomic.NewBool(false), } config, err := a.loadConfigOrDefault(core.activeContext) @@ -497,18 +497,14 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for {"type", "client"}, }) - // Since we are the primary, store global clients - // Create fragments from global clients and store the segment - if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil { - return ret + if a.hasDedupClientsUpgrade(ctx) { + // Since we are the primary, store global clients + // Create fragments from global clients and store the segment + if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil { + return ret + } } - } else if !a.dedupClientsUpgradeComplete.Load() { - // We are the secondary, and an upgrade is in progress. In this case we will temporarily store the data at this old path - // This data will be garbage collected after the upgrade has completed - if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentSegment, force, ""); ret != nil { - return ret - } } // If segment start time is zero, do not update or write @@ -540,8 +536,17 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for }) } + allLocalFragments := append(standbyLocalFragments, localFragment) + + if !a.hasDedupClientsUpgrade(ctx) { + // In case an upgrade is in progress we will temporarily store the data at this old path + // This data will be garbage collected after the upgrade has completed + a.logger.Debug("upgrade to 1.19 or above is in progress. storing data at old storage path until upgrade is complete") + return a.createCurrentSegmentFromFragments(ctx, append(globalFragments, allLocalFragments...), &a.currentSegment, force, "") + } + // store local fragments - if ret := a.createCurrentSegmentFromFragments(ctx, append(standbyLocalFragments, localFragment), &a.currentLocalSegment, force, activityLocalPathPrefix); ret != nil { + if ret := a.createCurrentSegmentFromFragments(ctx, allLocalFragments, &a.currentLocalSegment, force, activityLocalPathPrefix); ret != nil { return ret } @@ -635,7 +640,7 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra return nil } -func (a *ActivityLog) savePreviousTokenSegments(ctx context.Context, startTime int64, pathPrefix string, fragments []*activity.LogFragment) error { +func (a *ActivityLog) savePreviousTokenSegments(ctx context.Context, startTime int64, fragments []*activity.LogFragment) error { tokenByNamespace := make(map[string]uint64) for _, fragment := range fragments { // As of 1.9, a fragment should no longer have any NonEntityTokens. However @@ -660,7 +665,7 @@ func (a *ActivityLog) savePreviousTokenSegments(ctx context.Context, startTime i tokenCount: &activity.TokenCount{CountByNamespaceID: tokenByNamespace}, } - if _, err := a.saveSegmentEntitiesInternal(ctx, segmentToStore, false, pathPrefix); err != nil { + if _, err := a.saveSegmentTokensInternal(ctx, segmentToStore, false); err != nil { return err } return nil @@ -846,9 +851,9 @@ func (a *ActivityLog) availableTimesAtPath(ctx context.Context, onlyIncludeTimes return nil, err } out := make([]time.Time, 0) - for _, path := range paths { + for _, pathTime := range paths { // generate a set of unique start times - segmentTime, err := timeutil.ParseTimeFromPath(path) + segmentTime, err := timeutil.ParseTimeFromPath(pathTime) if err != nil { return nil, err } @@ -1035,56 +1040,21 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti a.currentSegment.startTimestamp = startTime.Unix() // load current global segment - path := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10) - - out, err := a.readEntitySegmentAtPath(ctx, path) - if err != nil && !errors.Is(err, ErrEmptyResponse) { + clients, err := a.loadClientDataIntoSegment(ctx, activityGlobalPathPrefix, startTime, globalSegmentSequenceNumber, &a.currentGlobalSegment) + if err != nil { return err } - if out != nil { - if !a.core.perfStandby { - a.currentGlobalSegment = segmentInfo{ - startTimestamp: startTime.Unix(), - currentClients: &activity.EntityActivityLog{ - Clients: out.Clients, - }, - tokenCount: &activity.TokenCount{ - CountByNamespaceID: make(map[string]uint64), - }, - clientSequenceNumber: globalSegmentSequenceNumber, - } - } else { - // populate this for edge case checking (if end of month passes while background loading on standby) - a.currentGlobalSegment.startTimestamp = startTime.Unix() - } - for _, client := range out.Clients { - a.globalPartialMonthClientTracker[client.ClientID] = client - } + for _, entity := range clients { + a.globalPartialMonthClientTracker[entity.ClientID] = entity } // load current local segment - path = activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(localSegmentSequenceNumber, 10) - out, err = a.readEntitySegmentAtPath(ctx, path) - if err != nil && !errors.Is(err, ErrEmptyResponse) { + clients, err = a.loadClientDataIntoSegment(ctx, activityLocalPathPrefix, startTime, localSegmentSequenceNumber, &a.currentLocalSegment) + if err != nil { return err } - if out != nil { - if !a.core.perfStandby { - a.currentLocalSegment = segmentInfo{ - startTimestamp: startTime.Unix(), - currentClients: &activity.EntityActivityLog{ - Clients: out.Clients, - }, - tokenCount: a.currentLocalSegment.tokenCount, - clientSequenceNumber: localSegmentSequenceNumber, - } - } else { - // populate this for edge case checking (if end of month passes while background loading on standby) - a.currentLocalSegment.startTimestamp = startTime.Unix() - } - for _, client := range out.Clients { - a.partialMonthLocalClientTracker[client.ClientID] = client - } + for _, entity := range clients { + a.partialMonthLocalClientTracker[entity.ClientID] = entity } return nil @@ -1141,7 +1111,7 @@ func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) // loadTokenCount populates the in-memory representation of activity token count // this function should be called with the lock held -func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) error { +func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time, segment *segmentInfo) error { tokenCountExists, err := a.tokenCountExists(ctx, startTime) if err != nil { return err @@ -1173,7 +1143,7 @@ func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) e // We must load the tokenCount of the current segment into the activity log // so that TWEs counted before the introduction of a client ID for TWEs are // still reported in the partial client counts. - a.currentLocalSegment.tokenCount = out + segment.tokenCount = out return nil } @@ -1202,8 +1172,8 @@ func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitG // Call with fragmentLock, globalFragmentLock, localFragmentLock and l held. func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) { a.logger.Trace("initializing new log") - a.resetCurrentLog() - a.setCurrentSegmentTimeLocked(now) + // We will normalize times to start of the month to avoid errors + a.newMonthCurrentLogLocked(now) } // Should be called with fragmentLock, globalFragmentLock, localFragmentLock and l held. @@ -1239,6 +1209,10 @@ func (a *ActivityLog) setCurrentSegmentTimeLocked(t time.Time) { func (a *ActivityLog) resetCurrentLog() { // setting a.currentSegment timestamp to support upgrades a.currentSegment.startTimestamp = 0 + a.currentSegment.currentClients = &activity.EntityActivityLog{ + Clients: make([]*activity.EntityRecord, 0), + } + a.currentSegment.clientSequenceNumber = 0 // global segment a.currentGlobalSegment.startTimestamp = 0 @@ -1289,18 +1263,19 @@ func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, } func (a *ActivityLog) deleteOldStoragePathWorker(ctx context.Context, pathPrefix string) { - pathTimes, err := a.view.List(ctx, pathPrefix) + times, err := a.availableTimesAtPath(ctx, time.Now(), pathPrefix) if err != nil { a.logger.Error("could not list segment paths", "error", err) return } - for _, pathTime := range pathTimes { - segments, err := a.view.List(ctx, pathPrefix+pathTime) + for _, pathTime := range times { + pathWithTime := fmt.Sprintf("%s%d/", pathPrefix, pathTime.Unix()) + segments, err := a.view.List(ctx, pathWithTime) if err != nil { a.logger.Error("could not list segment path", "error", err) } for _, seqNum := range segments { - err = a.view.Delete(ctx, pathPrefix+pathTime+seqNum) + err = a.view.Delete(ctx, pathWithTime+seqNum) if err != nil { a.logger.Error("could not delete log", "error", err) } @@ -1335,6 +1310,19 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro a.localFragmentLock.Lock() defer a.localFragmentLock.Unlock() + // Garbage collect data at old storage paths + if a.hasDedupClientsUpgrade(ctx) { + a.deleteOldStoragePathWorker(ctx, activityEntityBasePath) + a.deleteOldStoragePathWorker(ctx, activityTokenBasePath) + secondaryIds, err := a.view.List(ctx, activitySecondaryTempDataPathPrefix) + if err != nil { + return err + } + for _, secondaryId := range secondaryIds { + a.deleteOldStoragePathWorker(ctx, activitySecondaryTempDataPathPrefix+secondaryId+activityEntityBasePath) + } + } + decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx, now) if err != nil { return err @@ -1349,7 +1337,35 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro a.startNewCurrentLogLocked(now) } } + } + // If we have not finished upgrading, we will refresh currentSegment so data + // can be stored at the old paths until the upgrade is complete. + if !a.hasDedupClientsUpgrade(ctx) && !a.core.perfStandby { + times, err := a.availableTimesAtPath(ctx, now, activityEntityBasePath) + if err != nil { + return err + } + if len(times) > 0 { + mostRecentTimeOldEntityPath := times[len(times)-1] + // The most recent time is either the current month or the next month (if we missed the rotation perhaps) + if timeutil.IsCurrentMonth(mostRecentTimeOldEntityPath, now) { + // setting a.currentSegment timestamp to support upgrades + a.currentSegment.startTimestamp = mostRecentTimeOldEntityPath.Unix() + // This follows the logic in loadCurrentClientSegment + // We do not want need to set a clientSeq number of perf nodes because no client data is written on perf nodes, it is forwarded to the active node + if !a.core.perfStandby { + segmentNum, exists, err := a.getLastSegmentNumberByEntityPath(ctx, activityEntityBasePath+fmt.Sprint(mostRecentTimeOldEntityPath.Unix())+"/") + if err == nil && exists { + a.loadClientDataIntoSegment(ctx, "", mostRecentTimeOldEntityPath, segmentNum, &a.currentSegment) + } + } + } + } + } + + // We can exit before doing any further refreshing if we are in the middle of an upgrade or there are no logs + if len(decreasingLogTimes) == 0 || !a.hasDedupClientsUpgrade(ctx) { return nil } @@ -1395,7 +1411,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro // is still required since without it, we would lose replicated TWE counts for the // current segment. if !a.core.perfStandby { - err = a.loadTokenCount(ctx, mostRecent) + err = a.loadTokenCount(ctx, mostRecent, &a.currentLocalSegment) if err != nil { return err } @@ -1665,17 +1681,21 @@ func (c *Core) secondaryDuplicateClientMigrationWorker(ctx context.Context) { manager := c.activityLog manager.logger.Trace("started secondary activity log migration worker") storageMigrationComplete := atomic.NewBool(false) + globalClientDataSent := atomic.NewBool(false) wg := &sync.WaitGroup{} wg.Add(1) go func() { - if !c.IsPerfSecondary() { - // TODO: Create function for the secondary to continuously attempt to send data to the primary + defer wg.Done() + _, err := manager.sendPreviousMonthGlobalClientsWorker(ctx) + if err != nil { + manager.logger.Debug("failed to send previous months client data to primary", "error", err) + return } - - wg.Done() + globalClientDataSent.Store(true) }() wg.Add(1) go func() { + defer wg.Done() localClients, _, err := manager.extractLocalGlobalClientsDeprecatedStoragePath(ctx) if err != nil { return @@ -1690,31 +1710,46 @@ func (c *Core) secondaryDuplicateClientMigrationWorker(ctx context.Context) { return } } + + // Get tokens from previous months at old storage paths + clusterTokens, err := manager.extractTokensDeprecatedStoragePath(ctx) + + // Store tokens at new path + for month, tokenCount := range clusterTokens { + // Combine all token counts from all clusters + logFragments := make([]*activity.LogFragment, len(tokenCount)) + for i, tokens := range tokenCount { + logFragments[i] = &activity.LogFragment{NonEntityTokens: tokens} + } + if err = manager.savePreviousTokenSegments(ctx, month, logFragments); err != nil { + manager.logger.Error("failed to write token segment", "error", err, "month", month) + return + } + } + storageMigrationComplete.Store(true) // TODO: generate/store PCQs for these local clients - wg.Done() }() wg.Wait() if !storageMigrationComplete.Load() { manager.logger.Error("could not complete migration of duplicate clients on cluster") return } + if !globalClientDataSent.Load() { + manager.logger.Error("could not send global clients to the primary") + return + } // We have completed the vital portions of the storage migration if err := manager.writeDedupClientsUpgrade(ctx); err != nil { manager.logger.Error("could not complete migration of duplicate clients on cluster") return } - // Now that all the clients have been migrated and PCQs have been created, remove all clients at old storage paths - manager.oldStoragePathsCleaned = make(chan struct{}) - go func() { - defer close(manager.oldStoragePathsCleaned) - manager.deleteOldStoragePathWorker(ctx, activityEntityBasePath) - manager.deleteOldStoragePathWorker(ctx, activityTokenBasePath) - // TODO: Delete old PCQs - }() + // TODO: Delete old PCQs + + // Refresh activity log and load current month entities into memory + manager.refreshFromStoredLog(ctx, wg, time.Now().UTC()) - manager.dedupClientsUpgradeComplete.Store(true) manager.logger.Trace("completed secondary activity log migration worker") } @@ -1752,6 +1787,31 @@ func (a *ActivityLog) writeDedupClientsUpgrade(ctx context.Context) error { return a.view.Put(ctx, regeneratedEntry) } +func (a *ActivityLog) incrementSecondaryClientRecCount(ctx context.Context) error { + val, _ := a.getSecondaryClientRecCount(ctx) + val += 1 + regeneratedEntry, err := logical.StorageEntryJSON(activitySecondaryDataRecCount, val) + if err != nil { + return err + } + return a.view.Put(ctx, regeneratedEntry) +} + +func (a *ActivityLog) getSecondaryClientRecCount(ctx context.Context) (int, error) { + out, err := a.view.Get(ctx, activitySecondaryDataRecCount) + if err != nil { + return 0, err + } + if out == nil { + return 0, nil + } + var data int + if err = out.DecodeJSON(&data); err != nil { + return 0, err + } + return data, err +} + func (a *ActivityLog) regeneratePrecomputedQueries(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -1920,7 +1980,7 @@ func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) { } // Only send data if no upgrade is in progress. Else, the active worker will // store the data in a temporary location until it is garbage collected - if a.dedupClientsUpgradeComplete.Load() { + if a.hasDedupClientsUpgrade(ctx) { sendFunc() } @@ -1935,7 +1995,7 @@ func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) { } // If an upgrade is in progress, don't do anything // The active fragmentWorker will take care of flushing the clients to a temporary location - if a.dedupClientsUpgradeComplete.Load() { + if a.hasDedupClientsUpgrade(ctx) { sendFunc() // clear active entity set a.globalFragmentLock.Lock() @@ -4037,7 +4097,6 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) { } else { // Store that upgrade processes have already been completed manager.writeDedupClientsUpgrade(ctx) - manager.dedupClientsUpgradeComplete.Store(true) } } else { // We kick off the secondary migration worker in any chance that the primary has not yet upgraded. @@ -4045,11 +4104,6 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) { // already upgraded primary if !manager.hasDedupClientsUpgrade(ctx) { go c.secondaryDuplicateClientMigrationWorker(ctx) - } else { - // Store that upgrade processes have already been completed - manager.writeDedupClientsUpgrade(ctx) - manager.dedupClientsUpgradeComplete.Store(true) - } } } @@ -4062,10 +4116,11 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) { func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error { a := c.activityLog a.logger.Trace("started primary activity log migration worker") + ctx, cancel := context.WithCancel(ctx) + defer cancel() // Collect global clients from secondary - err := a.waitForSecondaryGlobalClients(ctx) - if err != nil { + if err := a.waitForSecondaryGlobalClients(ctx); err != nil { return err } @@ -4077,8 +4132,36 @@ func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error } // Get tokens from previous months at old storage paths clusterTokens, err := a.extractTokensDeprecatedStoragePath(ctx) + if err != nil { + return nil + } - // TODO: Collect clients from secondaries into slice of fragments + // Collect global clients from secondaries and put them in the clusterGlobalClients map + secondaryIds, err := a.view.List(ctx, activitySecondaryTempDataPathPrefix) + if err != nil { + return err + } + for _, secondaryId := range secondaryIds { + times, err := a.availableTimesAtPath(ctx, time.Now(), activitySecondaryTempDataPathPrefix+secondaryId+activityEntityBasePath) + if err != nil { + a.logger.Error("could not list secondary cluster clients until for cluster", "cluster", secondaryId) + return err + } + for _, time := range times { + segments, err := a.getAllEntitySegmentsForMonth(ctx, activitySecondaryTempDataPathPrefix+secondaryId+activityEntityBasePath, time.Unix()) + if err != nil { + return err + } + for _, segment := range segments { + for _, entity := range segment.GetClients() { + if _, ok := clusterGlobalClients[time.Unix()]; !ok { + clusterGlobalClients[time.Unix()] = make([]*activity.EntityRecord, 0) + } + clusterGlobalClients[time.Unix()] = append(clusterGlobalClients[time.Unix()], entity) + } + } + } + } // Store global clients at new path for month, entitiesForMonth := range clusterGlobalClients { @@ -4107,7 +4190,7 @@ func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error for i, tokens := range tokenCount { logFragments[i] = &activity.LogFragment{NonEntityTokens: tokens} } - if err = a.savePreviousTokenSegments(ctx, month, activityLocalPathPrefix+activityTokenBasePath, logFragments); err != nil { + if err = a.savePreviousTokenSegments(ctx, month, logFragments); err != nil { a.logger.Error("failed to write token segment", "error", err, "month", month) return err } @@ -4119,15 +4202,12 @@ func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error a.logger.Error("could not complete migration of duplicate clients on cluster") return err } - // Garbage collect data at old paths - a.oldStoragePathsCleaned = make(chan struct{}) - go func() { - defer close(a.oldStoragePathsCleaned) - a.deleteOldStoragePathWorker(ctx, activityEntityBasePath) - a.deleteOldStoragePathWorker(ctx, activityTokenBasePath) - // We will also need to delete old PCQs - }() - a.dedupClientsUpgradeComplete.Store(true) + + // TODO: We will also need to delete old PCQs + + // Refresh activity log and load current month entities into memory + a.refreshFromStoredLog(ctx, &sync.WaitGroup{}, time.Now().UTC()) + a.logger.Trace("completed primary activity log migration worker") return nil } diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 8599592d80..314cb22c2c 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -12,9 +12,7 @@ import ( "io" "net/http" "reflect" - "sort" "strconv" - "strings" "sync" "testing" "time" @@ -1372,69 +1370,6 @@ func TestActivityLog_tokenCountExists(t *testing.T) { } } -// entityRecordsEqual compares the parts we care about from two activity entity record slices -// note: this makes a copy of the []*activity.EntityRecord so that misordered slices won't fail the comparison, -// but the function won't modify the order of the slices to compare -func entityRecordsEqual(t *testing.T, record1, record2 []*activity.EntityRecord) bool { - t.Helper() - - if record1 == nil { - return record2 == nil - } - if record2 == nil { - return record1 == nil - } - - if len(record1) != len(record2) { - return false - } - - // sort first on namespace, then on ID, then on timestamp - entityLessFn := func(e []*activity.EntityRecord, i, j int) bool { - ei := e[i] - ej := e[j] - - nsComp := strings.Compare(ei.NamespaceID, ej.NamespaceID) - if nsComp == -1 { - return true - } - if nsComp == 1 { - return false - } - - idComp := strings.Compare(ei.ClientID, ej.ClientID) - if idComp == -1 { - return true - } - if idComp == 1 { - return false - } - - return ei.Timestamp < ej.Timestamp - } - - entitiesCopy1 := make([]*activity.EntityRecord, len(record1)) - entitiesCopy2 := make([]*activity.EntityRecord, len(record2)) - copy(entitiesCopy1, record1) - copy(entitiesCopy2, record2) - - sort.Slice(entitiesCopy1, func(i, j int) bool { - return entityLessFn(entitiesCopy1, i, j) - }) - sort.Slice(entitiesCopy2, func(i, j int) bool { - return entityLessFn(entitiesCopy2, i, j) - }) - - for i, a := range entitiesCopy1 { - b := entitiesCopy2[i] - if a.ClientID != b.ClientID || a.NamespaceID != b.NamespaceID || a.Timestamp != b.Timestamp { - return false - } - } - - return true -} - func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) { t.Helper() @@ -1586,7 +1521,7 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { } currentGlobalEntities := a.GetCurrentGlobalEntities() - if !entityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) { + if !EntityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) { t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentGlobalEntities, tc.path) } @@ -1742,7 +1677,7 @@ func TestActivityLog_loadTokenCount(t *testing.T) { } for _, tc := range testCases { - err := a.loadTokenCount(ctx, time.Unix(tc.time, 0)) + err := a.loadTokenCount(ctx, time.Unix(tc.time, 0), &a.currentLocalSegment) if err != nil { t.Fatalf("got error loading data for %q: %v", tc.path, err) } @@ -1810,13 +1745,99 @@ func TestActivityLog_StopAndRestart(t *testing.T) { } } +func addActivityRecordsOldStoragePath(t *testing.T, core *Core, base time.Time, includeEntities, includeTokens bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) { + t.Helper() + + monthsAgo := base.AddDate(0, -3, 0) + a := core.activityLog + var entityRecords []*activity.EntityRecord + if includeEntities { + entityRecords = []*activity.EntityRecord{ + { + ClientID: "11111111-1111-1111-1111-111111111111", + NamespaceID: namespace.RootNamespaceID, + Timestamp: time.Now().Unix(), + }, + { + ClientID: "22222222-2222-2222-2222-222222222222", + NamespaceID: namespace.RootNamespaceID, + Timestamp: time.Now().Unix(), + }, + { + ClientID: "33333333-2222-2222-2222-222222222222", + NamespaceID: namespace.RootNamespaceID, + Timestamp: time.Now().Unix(), + }, + } + if constants.IsEnterprise { + entityRecords = append(entityRecords, []*activity.EntityRecord{ + { + ClientID: "44444444-1111-1111-1111-111111111111", + NamespaceID: "ns1", + Timestamp: time.Now().Unix(), + }, + }...) + } + + // append some local entity data + entityRecords = append(entityRecords, &activity.EntityRecord{ + ClientID: "44444444-4444-4444-4444-444444444444", + NamespaceID: namespace.RootNamespaceID, + Timestamp: time.Now().Unix(), + }) + + for i, entityRecord := range entityRecords { + entityData, err := proto.Marshal(&activity.EntityActivityLog{ + Clients: []*activity.EntityRecord{entityRecord}, + }) + if err != nil { + t.Fatalf(err.Error()) + } + switch i { + case 0: + WriteToStorage(t, core, ActivityPrefix+activityEntityBasePath+fmt.Sprint(monthsAgo.Unix())+"/0", entityData) + + case len(entityRecords) - 1: + // local data + WriteToStorage(t, core, ActivityPrefix+activityEntityBasePath+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) + default: + WriteToStorage(t, core, ActivityPrefix+activityEntityBasePath+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) + } + } + } + + var tokenRecords map[string]uint64 + if includeTokens { + tokenRecords = make(map[string]uint64) + tokenRecords[namespace.RootNamespaceID] = uint64(1) + if constants.IsEnterprise { + for i := 1; i < 4; i++ { + nsID := "ns" + strconv.Itoa(i) + tokenRecords[nsID] = uint64(i) + } + } + tokenCount := &activity.TokenCount{ + CountByNamespaceID: tokenRecords, + } + + tokenData, err := proto.Marshal(tokenCount) + if err != nil { + t.Fatalf(err.Error()) + } + + WriteToStorage(t, core, ActivityPrefix+activityTokenBasePath+fmt.Sprint(base.Unix())+"/0", tokenData) + } + + return a, entityRecords, tokenRecords +} + // :base: is the timestamp to start from for the setup logic (use to simulate newest log from past or future) // entity records returned include [0] data from a previous month and [1:] data from the current month // token counts returned are from the current month -func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities, includeTokens bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) { +func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities, includeTokens, addOldStoragePathData bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) { t.Helper() - core, _, _ := TestCoreUnsealed(t) + core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{ActivityLogConfig: ActivityLogCoreConfig{ForceEnable: true}}) a := core.activityLog monthsAgo := base.AddDate(0, -3, 0) @@ -1898,13 +1919,17 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities WriteToStorage(t, core, ActivityLogLocalPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData) } + if addOldStoragePathData { + return addActivityRecordsOldStoragePath(t, core, base, includeEntities, includeTokens) + } return a, entityRecords, tokenRecords } -// TestActivityLog_refreshFromStoredLog writes records for 3 months ago and this month, then calls refreshFromStoredLog. +// TestActivityLog_refreshFromStoredLog_DedupUpgradeComplete writes records for 3 months ago and this month, then calls refreshFromStoredLog. +// The system believes the upgrade to 1.19+ is already complete. It should not refresh data from old storage paths, only data at the new storage paths. // The test verifies that current entities and current tokens are correct. -func TestActivityLog_refreshFromStoredLog(t *testing.T) { - a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true) +func TestActivityLog_refreshFromStoredLog_DedupUpgradeComplete(t *testing.T) { + a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true, true) a.SetEnable(true) var wg sync.WaitGroup @@ -1933,13 +1958,101 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) { } currentEntities := a.GetCurrentGlobalEntities() - if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { + if !EntityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { // we only expect the newest entity segment to be loaded (for the current month) t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) } currentLocalEntities := a.GetCurrentLocalEntities() - if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { + if !EntityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { + // we only expect the newest local entity segment to be loaded (for the current month) + t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities) + } + + nsCount := a.GetStoredTokenCountByNamespaceID() + require.Equal(t, nsCount, expectedTokenCounts) + + activeClients := a.core.GetActiveClientsList() + if err := ActiveEntitiesEqual(activeClients, expectedActive.Clients); err != nil { + // we expect activeClients to be loaded for the entire month + t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v: %v", expectedActive.Clients, activeClients, err) + } + + // verify active global clients list + activeGlobalClients := a.core.GetActiveGlobalClientsList() + if err := ActiveEntitiesEqual(activeGlobalClients, expectedActiveGlobal.Clients); err != nil { + // we expect activeClients to be loaded for the entire month + t.Errorf("bad data loaded into active global entities. expected only set of EntityID from %v in %v: %v", expectedActiveGlobal.Clients, activeGlobalClients, err) + } + // verify active local clients list + activeLocalClients := a.core.GetActiveLocalClientsList() + if err := ActiveEntitiesEqual(activeLocalClients, expectedCurrentLocal.Clients); err != nil { + // we expect activeClients to be loaded for the entire month + t.Errorf("bad data loaded into active local entities. expected only set of EntityID from %v in %v: %v", expectedCurrentLocal.Clients, activeLocalClients, err) + } + + // No data from the old storage paths should have been loaded because the system believes that the upgrade was already complete + a.ExpectOldSegmentRefreshed(t, time.Now().UTC().Unix(), false, []*activity.EntityRecord{}, map[string]uint64{}) +} + +// TestActivityLog_refreshFromStoredLog_DedupUpgradeIncomplete writes records for 3 months ago and this month, then calls refreshFromStoredLog. +// The system thinks the upgrade to 1.19+ is incomplete. It should not refresh data from new storage paths, only data at the old storage paths. +// The test verifies that current entities and current tokens are correct. +func TestActivityLog_refreshFromStoredLog_DedupUpgradeIncomplete(t *testing.T) { + a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true, true) + a.SetEnable(true) + + // Reset the system to state where the upgrade is incomplete + a.ResetDedupUpgrade(context.Background()) + + var wg sync.WaitGroup + now := time.Now().UTC() + err := a.refreshFromStoredLog(context.Background(), &wg, now) + if err != nil { + t.Fatalf("got error loading stored activity logs: %v", err) + } + wg.Wait() + + // active clients for the entire month + expectedActive := &activity.EntityActivityLog{ + Clients: expectedClientRecords[1:], + } + + // global clients added to the newest local entity segment + expectedCurrent := &activity.EntityActivityLog{ + Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1], + } + + expectedActiveGlobal := &activity.EntityActivityLog{ + Clients: expectedClientRecords[1 : len(expectedClientRecords)-1], + } + + // local client is only added to the newest segment for the current month. This should also appear in the active clients for the entire month. + expectedCurrentLocal := &activity.EntityActivityLog{ + Clients: expectedClientRecords[len(expectedClientRecords)-1:], + } + + // Data should be loaded into the old segment + a.ExpectOldSegmentRefreshed(t, now.Unix(), false, expectedCurrentLocal.GetClients(), map[string]uint64{}) + a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false) + + // Simulate the completion of an upgrade + a.writeDedupClientsUpgrade(context.Background()) + + err = a.refreshFromStoredLog(context.Background(), &wg, now) + if err != nil { + t.Fatalf("got error loading stored activity logs: %v", err) + } + wg.Wait() + + currentEntities := a.GetCurrentGlobalEntities() + if !EntityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { + // we only expect the newest entity segment to be loaded (for the current month) + t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) + } + + currentLocalEntities := a.GetCurrentLocalEntities() + if !EntityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { // we only expect the newest local entity segment to be loaded (for the current month) t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities) } @@ -1974,7 +2087,7 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) { // test closes a.doneCh and calls refreshFromStoredLog, which will not do any processing because the doneCh is closed. // The test verifies that the current data is not loaded. func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testing.T) { - a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true) + a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true, false) a.SetEnable(true) var wg sync.WaitGroup @@ -2007,13 +2120,13 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi } currentEntities := a.GetCurrentGlobalEntities() - if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { + if !EntityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { // we only expect the newest entity segment to be loaded (for the current month) t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) } currentLocalEntities := a.GetCurrentLocalEntities() - if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { + if !EntityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { // we only expect the newest local entity segment to be loaded (for the current month) t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities) } @@ -2046,7 +2159,7 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi // TestActivityLog_refreshFromStoredLogContextCancelled writes data from 3 months ago to this month and calls // refreshFromStoredLog with a canceled context, verifying that the function errors because of the canceled context. func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) { - a, _, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true) + a, _, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true, false) var wg sync.WaitGroup ctx, cancelFn := context.WithCancel(context.Background()) @@ -2061,7 +2174,7 @@ func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) { // TestActivityLog_refreshFromStoredLogNoTokens writes only entities from 3 months ago to today, then calls // refreshFromStoredLog. It verifies that there are no tokens loaded. func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) { - a, expectedClientRecords, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, false) + a, expectedClientRecords, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, false, false) a.SetEnable(true) var wg sync.WaitGroup @@ -2082,13 +2195,13 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) { } currentGlobalEntities := a.GetCurrentGlobalEntities() - if !entityRecordsEqual(t, currentGlobalEntities.Clients, expectedCurrentGlobal.Clients) { + if !EntityRecordsEqual(t, currentGlobalEntities.Clients, expectedCurrentGlobal.Clients) { // we only expect the newest entity segment to be loaded (for the current month) t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentGlobal, currentGlobalEntities) } currentLocalEntities := a.GetCurrentLocalEntities() - if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { + if !EntityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { // we only expect the newest local entity segment to be loaded (for the current month) t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities) } @@ -2108,7 +2221,7 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) { // TestActivityLog_refreshFromStoredLogNoEntities writes only direct tokens from 3 months ago to today, and runs // refreshFromStoredLog. It verifies that there are no entities or clients loaded. func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) { - a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), false, true) + a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), false, true, false) a.SetEnable(true) var wg sync.WaitGroup @@ -2138,17 +2251,29 @@ func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) { // current segment counts are zero. func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) { now := time.Now().UTC() - a, _, _ := setupActivityRecordsInStorage(t, now, false, false) + a, _, _ := setupActivityRecordsInStorage(t, now, false, false, true) a.SetEnable(true) + // Simulate an upgrade that is incomplete + a.ResetDedupUpgrade(context.Background()) var wg sync.WaitGroup err := a.refreshFromStoredLog(context.Background(), &wg, now) if err != nil { t.Fatalf("got error loading stored activity logs: %v", err) } wg.Wait() + a.ExpectOldSegmentRefreshed(t, timeutil.StartOfMonth(now).Unix(), false, []*activity.EntityRecord{}, map[string]uint64{}) + a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false) - a.ExpectCurrentSegmentRefreshed(t, now.Unix(), false) + // Simulate an upgrade that is complete + require.NoError(t, a.writeDedupClientsUpgrade(context.Background())) + err = a.refreshFromStoredLog(context.Background(), &wg, now) + if err != nil { + t.Fatalf("got error loading stored activity logs: %v", err) + } + wg.Wait() + a.ExpectOldSegmentRefreshed(t, timeutil.StartOfMonth(now).Unix(), false, []*activity.EntityRecord{}, map[string]uint64{}) + a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false) } // TestActivityLog_refreshFromStoredLogTwoMonthsPrevious creates segment data from 5 months ago to 2 months ago and @@ -2157,17 +2282,29 @@ func TestActivityLog_refreshFromStoredLogTwoMonthsPrevious(t *testing.T) { // test what happens when the most recent data is from month M-2 (or earlier - same effect) now := time.Now().UTC() twoMonthsAgoStart := timeutil.StartOfPreviousMonth(timeutil.StartOfPreviousMonth(now)) - a, _, _ := setupActivityRecordsInStorage(t, twoMonthsAgoStart, true, true) + a, _, _ := setupActivityRecordsInStorage(t, twoMonthsAgoStart, true, true, true) a.SetEnable(true) + // Simulate an upgrade that is incomplete + a.ResetDedupUpgrade(context.Background()) var wg sync.WaitGroup err := a.refreshFromStoredLog(context.Background(), &wg, now) if err != nil { t.Fatalf("got error loading stored activity logs: %v", err) } wg.Wait() + a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false) + a.ExpectOldSegmentRefreshed(t, timeutil.StartOfMonth(now).Unix(), false, []*activity.EntityRecord{}, map[string]uint64{}) - a.ExpectCurrentSegmentRefreshed(t, now.Unix(), false) + // Simulate an upgrade that is complete + a.writeDedupClientsUpgrade(context.Background()) + err = a.refreshFromStoredLog(context.Background(), &wg, now) + if err != nil { + t.Fatalf("got error loading stored activity logs: %v", err) + } + wg.Wait() + a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false) + a.ExpectOldSegmentRefreshed(t, timeutil.StartOfMonth(now).Unix(), false, []*activity.EntityRecord{}, map[string]uint64{}) } // TestActivityLog_refreshFromStoredLogPreviousMonth creates segment data from 4 months ago to 1 month ago, then calls @@ -2178,9 +2315,12 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) { // can handle end of month rotations monthStart := timeutil.StartOfMonth(time.Now().UTC()) oneMonthAgoStart := timeutil.StartOfPreviousMonth(monthStart) - a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, oneMonthAgoStart, true, true) + a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, oneMonthAgoStart, true, true, true) a.SetEnable(true) + // Reset upgrade attributes to simulate startup + a.ResetDedupUpgrade(context.Background()) + var wg sync.WaitGroup err := a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC()) if err != nil { @@ -2188,6 +2328,18 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) { } wg.Wait() + // Previous month data should not be loaded into the currentSegment + a.ExpectOldSegmentRefreshed(t, monthStart.Unix(), false, []*activity.EntityRecord{}, map[string]uint64{}) + a.ExpectCurrentSegmentsRefreshed(t, monthStart.Unix(), false) + + // Simulate completion of upgrade + require.NoError(t, a.writeDedupClientsUpgrade(context.Background())) + + // With a refresh after upgrade is complete, the currentGlobalSegment and currentLocalSegment should contain data + err = a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC()) + require.NoError(t, err) + wg.Wait() + expectedActive := &activity.EntityActivityLog{ Clients: expectedClientRecords[1:], } @@ -2196,16 +2348,13 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) { } currentEntities := a.GetCurrentGlobalEntities() - if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { + if !EntityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { // we only expect the newest entity segment to be loaded (for the current month) t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) } nsCount := a.GetStoredTokenCountByNamespaceID() - if !reflect.DeepEqual(nsCount, expectedTokenCounts) { - // we expect all token counts to be loaded - t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, nsCount) - } + require.Equal(t, expectedTokenCounts, nsCount) activeClients := a.core.GetActiveClientsList() if err := ActiveEntitiesEqual(activeClients, expectedActive.Clients); err != nil { @@ -2433,7 +2582,7 @@ func TestActivityLog_EnableDisable(t *testing.T) { } expectMissingSegment(t, core, path) - a.ExpectCurrentSegmentRefreshed(t, 0, false) + a.ExpectCurrentSegmentsRefreshed(t, 0, false) // enable (if not already) which force-writes an empty segment enableRequest() @@ -4152,7 +4301,7 @@ func TestActivityLog_partialMonthClientCount(t *testing.T) { ctx := namespace.RootContext(nil) now := time.Now().UTC() - a, clients, _ := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true) + a, clients, _ := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true, false) // clients[0] belongs to previous month clients = clients[1:] @@ -4223,7 +4372,7 @@ func TestActivityLog_partialMonthClientCountUsingHandleQuery(t *testing.T) { ctx := namespace.RootContext(nil) now := time.Now().UTC() - a, clients, _ := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true) + a, clients, _ := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true, false) // clients[0] belongs to previous month clients = clients[1:] @@ -5831,7 +5980,7 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) { a.SetEnable(true) ctx := context.Background() - timeStamp := time.Now() + timeStamp := time.Now().UTC() startOfMonth := timeutil.StartOfMonth(timeStamp) oneMonthAgo := timeutil.StartOfPreviousMonth(timeStamp) twoMonthsAgo := timeutil.StartOfPreviousMonth(oneMonthAgo) @@ -5865,13 +6014,28 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) { a.savePreviousEntitySegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[1:], clientRecordsGlobal[1:]...)}}) a.savePreviousEntitySegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[2:], clientRecordsGlobal[2:]...)}}) + // Write tokens to old path. We write twice to simulate multiple segments for each month + for i := 0; i < 2; i++ { + writeTokenSegmentOldPath(t, core, twoMonthsAgo, i, &activity.TokenCount{CountByNamespaceID: tokenCounts}) + writeTokenSegmentOldPath(t, core, oneMonthAgo, i, &activity.TokenCount{CountByNamespaceID: tokenCounts}) + writeTokenSegmentOldPath(t, core, startOfMonth, i, &activity.TokenCount{CountByNamespaceID: tokenCounts}) + } + + // Write secondary cluster data. This is to make sure that the data at these paths are garbage collected at the end of the migration routine + numSecondarySegments := 4 + secondaryIds := make([]string, 0) + for i := 0; i < numSecondarySegments; i++ { + writeSecondaryClusterSegment(t, core, twoMonthsAgo, i, fmt.Sprintf("cluster_%d", i), &activity.EntityActivityLog{Clients: clientRecordsGlobal[:ActivitySegmentClientCapacity]}) + writeSecondaryClusterSegment(t, core, oneMonthAgo, i, fmt.Sprintf("cluster_%d", i), &activity.EntityActivityLog{Clients: clientRecordsGlobal[1:ActivitySegmentClientCapacity]}) + writeSecondaryClusterSegment(t, core, startOfMonth, i, fmt.Sprintf("cluster_%d", i), &activity.EntityActivityLog{Clients: clientRecordsGlobal[2:ActivitySegmentClientCapacity]}) + secondaryIds = append(secondaryIds, fmt.Sprintf("cluster_%d", i)) + } + // Assert that the migration workers have not been run require.True(t, a.hasDedupClientsUpgrade(ctx)) - require.True(t, a.dedupClientsUpgradeComplete.Load()) // Resetting this to false so that we can // verify that after the migrations is completed, the correct values have been stored - a.dedupClientsUpgradeComplete.Store(false) require.NoError(t, a.view.Delete(ctx, activityDeduplicationUpgradeKey)) // Forcefully run the primary migration worker @@ -5891,6 +6055,7 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) { require.NoError(t, err) globalClients = append(globalClients, segment.GetClients()...) } + // We've added duplicate clients from secondaries, so this should not affect the count of the global clients require.Equal(t, len(clientRecordsGlobal)-index, len(globalClients)) } @@ -5914,31 +6079,23 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) { for _, time := range times { reader, err := a.NewSegmentFileReader(ctx, time) require.NoError(t, err) + numTokenSegments := 0 for { segment, err := reader.ReadToken(ctx) if errors.Is(err, io.EOF) { break } + numTokenSegments += 1 require.NoError(t, err) // Verify that the data is correct deep.Equal(segment.GetCountByNamespaceID(), tokenCounts) } + // All tokens should have been combined into one segment + require.Equal(t, 1, numTokenSegments) } // Check that the storage key has been updated require.True(t, a.hasDedupClientsUpgrade(ctx)) - // Check that the bool has been updated - require.True(t, a.dedupClientsUpgradeComplete.Load()) - - // Wait for the deletion of old logs to complete - timeout := time.After(25 * time.Second) - // Wait for channel indicating deletion to be written - select { - case <-timeout: - t.Fatal("timed out waiting for deletion to complete") - case <-a.oldStoragePathsCleaned: - break - } // Verify there is no data at the old paths times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath) @@ -5949,152 +6106,11 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) { times, err = a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath) require.NoError(t, err) require.Equal(t, 0, len(times)) -} - -// TestActivityLog_SecondaryDuplicateClientMigrationWorker verifies that the secondary -// migration worker correctly moves local data from old location to the new location -func TestActivityLog_SecondaryDuplicateClientMigrationWorker(t *testing.T) { - cluster := NewTestCluster(t, nil, nil) - core := cluster.Cores[0].Core - a := core.activityLog - a.SetEnable(true) - - ctx := context.Background() - timeStamp := time.Now() - startOfMonth := timeutil.StartOfMonth(timeStamp) - oneMonthAgo := timeutil.StartOfPreviousMonth(timeStamp) - twoMonthsAgo := timeutil.StartOfPreviousMonth(oneMonthAgo) - - clientRecordsGlobal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1) - for i := range clientRecordsGlobal { - clientRecordsGlobal[i] = &activity.EntityRecord{ - ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i), - Timestamp: timeStamp.Unix(), - NonEntity: false, - } - } - clientRecordsLocal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1) - for i := range clientRecordsGlobal { - clientRecordsLocal[i] = &activity.EntityRecord{ - ClientID: fmt.Sprintf("011122222-3333-4444-5555-%012v", i), - Timestamp: timeStamp.Unix(), - // This is to trick the system into believing this a local client when parsing data - ClientType: nonEntityTokenActivityType, - } - } - - tokenCounts := map[string]uint64{ - "ns1": 10, - "ns2": 11, - "ns3": 12, - } - - // Write global and local clients to old path - a.savePreviousEntitySegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal, clientRecordsGlobal...)}}) - a.savePreviousEntitySegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[1:], clientRecordsGlobal[1:]...)}}) - a.savePreviousEntitySegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[2:], clientRecordsGlobal[2:]...)}}) - - // Write tokens to old path - a.savePreviousTokenSegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}}) - a.savePreviousTokenSegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}}) - a.savePreviousTokenSegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}}) - - // Assert that the migration workers have not been run - require.True(t, a.hasDedupClientsUpgrade(ctx)) - require.True(t, a.dedupClientsUpgradeComplete.Load()) - - // Resetting this to false so that we can - // verify that after the migrations is completed, the correct values have been stored - a.dedupClientsUpgradeComplete.Store(false) - require.NoError(t, a.view.Delete(ctx, activityDeduplicationUpgradeKey)) - - // Forcefully run the secondary migration worker - core.secondaryDuplicateClientMigrationWorker(ctx) - - // Wait for the storage migration to complete - ticker := time.NewTicker(100 * time.Millisecond) - timeout := time.After(25 * time.Second) - for { - select { - case <-timeout: - t.Fatal("timed out waiting for migration to complete") - case <-ticker.C: - } - if a.dedupClientsUpgradeComplete.Load() { - break - } - } - - // Verify that no global clients have been migrated - times := []time.Time{twoMonthsAgo, oneMonthAgo, startOfMonth} - for _, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - globalClients := make([]*activity.EntityRecord, 0) - for { - segment, err := reader.ReadGlobalEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - globalClients = append(globalClients, segment.GetClients()...) - } - require.Equal(t, 0, len(globalClients)) - } - - // Verify local clients have been correctly migrated - for index, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - localClients := make([]*activity.EntityRecord, 0) - for { - segment, err := reader.ReadLocalEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - localClients = append(localClients, segment.GetClients()...) - } - require.Equal(t, len(clientRecordsLocal)-index, len(localClients)) - } - - // Verify non-entity tokens have been correctly migrated - for _, time := range times { - reader, err := a.NewSegmentFileReader(ctx, time) - require.NoError(t, err) - for { - segment, err := reader.ReadToken(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - // Verify that the data is correct - deep.Equal(segment.GetCountByNamespaceID(), tokenCounts) - } - } - - // Check that the storage key has been updated - require.True(t, a.hasDedupClientsUpgrade(ctx)) - // Check that the bool has been updated - require.True(t, a.dedupClientsUpgradeComplete.Load()) - - // Wait for the deletion of old logs to complete - timeout = time.After(25 * time.Second) - // Wait for channel indicating deletion to be written - select { - case <-timeout: - t.Fatal("timed out waiting for deletion to complete") - case <-a.oldStoragePathsCleaned: - break - } - - // Verify there is no data at the old entity paths - times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath) - require.NoError(t, err) - require.Equal(t, 0, len(times)) - - // Verify there is no data at the old token paths - times, err = a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath) - require.NoError(t, err) - require.Equal(t, 0, len(times)) + + // Verify there is no data at the secondary cluster paths + for _, secondaryId := range secondaryIds { + times, err = a.availableTimesAtPath(ctx, time.Now(), activitySecondaryTempDataPathPrefix+secondaryId+activityEntityBasePath) + require.NoError(t, err) + require.Equal(t, 0, len(times)) + } } diff --git a/vault/activity_log_testing_util.go b/vault/activity_log_testing_util.go index d0fd4b7b35..42566bb920 100644 --- a/vault/activity_log_testing_util.go +++ b/vault/activity_log_testing_util.go @@ -7,13 +7,18 @@ import ( "context" "fmt" "math/rand" + "sort" + "strings" + "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/vault/helper/constants" "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault/activity" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" ) @@ -187,76 +192,136 @@ func RandStringBytes(n int) string { return string(b) } -// ExpectCurrentSegmentRefreshed verifies that the current segment has been refreshed -// non-nil empty components and updated with the `expectedStart` timestamp +// ExpectOldSegmentRefreshed verifies that the old current segment structure has been refreshed +// non-nil empty components and updated with the `expectedStart` timestamp. This is expected when +// an upgrade has not yet completed. // Note: if `verifyTimeNotZero` is true, ignore `expectedStart` and just make sure the timestamp isn't 0 -func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart int64, verifyTimeNotZero bool) { +func (a *ActivityLog) ExpectOldSegmentRefreshed(t *testing.T, expectedStart int64, verifyTimeNotZero bool, expectedEntities []*activity.EntityRecord, directTokens map[string]uint64) { t.Helper() a.l.RLock() defer a.l.RUnlock() a.fragmentLock.RLock() defer a.fragmentLock.RUnlock() - if a.currentGlobalSegment.currentClients == nil { - t.Fatalf("expected non-nil currentSegment.currentClients") + require.NotNil(t, a.currentSegment.currentClients) + require.NotNil(t, a.currentSegment.currentClients.Clients) + require.NotNil(t, a.currentSegment.tokenCount) + require.NotNil(t, a.currentSegment.tokenCount.CountByNamespaceID) + if !EntityRecordsEqual(t, a.currentSegment.currentClients.Clients, expectedEntities) { + // we only expect the newest entity segment to be loaded (for the current month) + t.Errorf("bad activity entity logs loaded. expected: %v got: %v", a.currentSegment.currentClients.Clients, expectedEntities) } - if a.currentGlobalSegment.currentClients.Clients == nil { - t.Errorf("expected non-nil currentSegment.currentClients.Entities") - } - if a.currentGlobalSegment.tokenCount == nil { - t.Fatalf("expected non-nil currentSegment.tokenCount") - } - if a.currentGlobalSegment.tokenCount.CountByNamespaceID == nil { - t.Errorf("expected non-nil currentSegment.tokenCount.CountByNamespaceID") - } - if a.currentLocalSegment.currentClients == nil { - t.Fatalf("expected non-nil currentSegment.currentClients") - } - if a.currentLocalSegment.currentClients.Clients == nil { - t.Errorf("expected non-nil currentSegment.currentClients.Entities") - } - if a.currentLocalSegment.tokenCount == nil { - t.Fatalf("expected non-nil currentSegment.tokenCount") - } - if a.currentLocalSegment.tokenCount.CountByNamespaceID == nil { - t.Errorf("expected non-nil currentSegment.tokenCount.CountByNamespaceID") - } - if a.partialMonthLocalClientTracker == nil { - t.Errorf("expected non-nil partialMonthLocalClientTracker") - } - if a.globalPartialMonthClientTracker == nil { - t.Errorf("expected non-nil globalPartialMonthClientTracker") - } - if len(a.currentGlobalSegment.currentClients.Clients) > 0 { - t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentGlobalSegment.currentClients) - } - if len(a.currentLocalSegment.currentClients.Clients) > 0 { - t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentLocalSegment.currentClients) - } - if len(a.currentLocalSegment.tokenCount.CountByNamespaceID) > 0 { - t.Errorf("expected no token counts to be loaded. got: %v", a.currentLocalSegment.tokenCount.CountByNamespaceID) - } - if len(a.partialMonthLocalClientTracker) > 0 { - t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthLocalClientTracker) - } - if len(a.globalPartialMonthClientTracker) > 0 { - t.Errorf("expected no active entity segment to be loaded. got: %v", a.globalPartialMonthClientTracker) + require.Equal(t, directTokens, a.currentSegment.tokenCount.CountByNamespaceID) + if verifyTimeNotZero { + require.NotEqual(t, a.currentSegment.startTimestamp, 0) + } else { + require.Equal(t, a.currentSegment.startTimestamp, expectedStart) } +} + +// ExpectCurrentSegmentsRefreshed verifies that the current segment has been refreshed +// non-nil empty components and updated with the `expectedStart` timestamp +// Note: if `verifyTimeNotZero` is true, ignore `expectedStart` and just make sure the timestamp isn't 0 +func (a *ActivityLog) ExpectCurrentSegmentsRefreshed(t *testing.T, expectedStart int64, verifyTimeNotZero bool) { + t.Helper() + + a.l.RLock() + defer a.l.RUnlock() + a.fragmentLock.RLock() + defer a.fragmentLock.RUnlock() + require.NotNil(t, a.currentGlobalSegment.currentClients) + require.NotNil(t, a.currentGlobalSegment.currentClients.Clients) + require.NotNil(t, a.currentGlobalSegment.tokenCount) + require.NotNil(t, a.currentGlobalSegment.tokenCount.CountByNamespaceID) + + require.NotNil(t, a.currentLocalSegment.currentClients) + require.NotNil(t, a.currentLocalSegment.currentClients.Clients) + require.NotNil(t, a.currentLocalSegment.tokenCount) + require.NotNil(t, a.currentLocalSegment.tokenCount.CountByNamespaceID) + + require.NotNil(t, a.partialMonthLocalClientTracker) + require.NotNil(t, a.globalPartialMonthClientTracker) + + require.Equal(t, 0, len(a.currentGlobalSegment.currentClients.Clients)) + require.Equal(t, 0, len(a.currentLocalSegment.currentClients.Clients)) + require.Equal(t, 0, len(a.currentLocalSegment.tokenCount.CountByNamespaceID)) + + require.Equal(t, 0, len(a.partialMonthLocalClientTracker)) + require.Equal(t, 0, len(a.globalPartialMonthClientTracker)) if verifyTimeNotZero { - if a.currentGlobalSegment.startTimestamp == 0 { - t.Error("bad start timestamp. expected no reset but timestamp was reset") - } - if a.currentLocalSegment.startTimestamp == 0 { - t.Error("bad start timestamp. expected no reset but timestamp was reset") - } - } else if a.currentGlobalSegment.startTimestamp != expectedStart { - t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentGlobalSegment.startTimestamp) - } else if a.currentLocalSegment.startTimestamp != expectedStart { - t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentLocalSegment.startTimestamp) + require.NotEqual(t, 0, a.currentGlobalSegment.startTimestamp) + require.NotEqual(t, 0, a.currentLocalSegment.startTimestamp) + require.NotEqual(t, 0, a.currentSegment.startTimestamp) + } else { + require.Equal(t, expectedStart, a.currentGlobalSegment.startTimestamp) + require.Equal(t, expectedStart, a.currentLocalSegment.startTimestamp) } } +// EntityRecordsEqual compares the parts we care about from two activity entity record slices +// note: this makes a copy of the []*activity.EntityRecord so that misordered slices won't fail the comparison, +// but the function won't modify the order of the slices to compare +func EntityRecordsEqual(t *testing.T, record1, record2 []*activity.EntityRecord) bool { + t.Helper() + + if record1 == nil { + return record2 == nil + } + if record2 == nil { + return record1 == nil + } + + if len(record1) != len(record2) { + return false + } + + // sort first on namespace, then on ID, then on timestamp + entityLessFn := func(e []*activity.EntityRecord, i, j int) bool { + ei := e[i] + ej := e[j] + + nsComp := strings.Compare(ei.NamespaceID, ej.NamespaceID) + if nsComp == -1 { + return true + } + if nsComp == 1 { + return false + } + + idComp := strings.Compare(ei.ClientID, ej.ClientID) + if idComp == -1 { + return true + } + if idComp == 1 { + return false + } + + return ei.Timestamp < ej.Timestamp + } + + entitiesCopy1 := make([]*activity.EntityRecord, len(record1)) + entitiesCopy2 := make([]*activity.EntityRecord, len(record2)) + copy(entitiesCopy1, record1) + copy(entitiesCopy2, record2) + + sort.Slice(entitiesCopy1, func(i, j int) bool { + return entityLessFn(entitiesCopy1, i, j) + }) + sort.Slice(entitiesCopy2, func(i, j int) bool { + return entityLessFn(entitiesCopy2, i, j) + }) + + for i, a := range entitiesCopy1 { + b := entitiesCopy2[i] + if a.ClientID != b.ClientID || a.NamespaceID != b.NamespaceID || a.Timestamp != b.Timestamp { + return false + } + } + + return true +} + // ActiveEntitiesEqual checks that only the set of `test` exists in `active` func ActiveEntitiesEqual(active []*activity.EntityRecord, test []*activity.EntityRecord) error { opts := []cmp.Option{protocmp.Transform(), cmpopts.SortSlices(func(x, y *activity.EntityRecord) bool { @@ -284,6 +349,7 @@ func (a *ActivityLog) SetStartTimestamp(timestamp int64) { defer a.l.Unlock() a.currentGlobalSegment.startTimestamp = timestamp a.currentLocalSegment.startTimestamp = timestamp + a.currentSegment.startTimestamp = timestamp } // GetStoredTokenCountByNamespaceID returns the count of tokens by namespace ID @@ -370,3 +436,38 @@ func (c *Core) DeleteLogsAtPath(ctx context.Context, t *testing.T, storagePath s } } } + +// SaveEntitySegment is a test helper function to keep the savePreviousEntitySegments function internal +func (a *ActivityLog) SaveEntitySegment(ctx context.Context, startTime int64, pathPrefix string, fragments []*activity.LogFragment) error { + return a.savePreviousEntitySegments(ctx, startTime, pathPrefix, fragments) +} + +// LaunchMigrationWorker is a test only helper function that launches the migration workers. +// This allows us to keep the migration worker methods internal +func (a *ActivityLog) LaunchMigrationWorker(ctx context.Context, isSecondary bool) { + if isSecondary { + go a.core.secondaryDuplicateClientMigrationWorker(ctx) + } else { + go a.core.primaryDuplicateClientMigrationWorker(ctx) + } +} + +// DedupUpgradeComplete is a test helper function that indicates whether the +// all correct states have been set after completing upgrade processes to 1.19+ +func (a *ActivityLog) DedupUpgradeComplete(ctx context.Context) bool { + return a.hasDedupClientsUpgrade(ctx) +} + +// ResetDedupUpgrade is a test helper function that resets the state to reflect +// how the system should look before running/completing any upgrade process to 1.19+ +func (a *ActivityLog) ResetDedupUpgrade(ctx context.Context) { + a.view.Delete(ctx, activityDeduplicationUpgradeKey) + a.view.Delete(ctx, activitySecondaryDataRecCount) +} + +// RefreshActivityLog is a test helper functions that refreshes the activity logs +// segments and current month data. This allows us to keep the refreshFromStoredLog +// function internal +func (a *ActivityLog) RefreshActivityLog(ctx context.Context) { + a.refreshFromStoredLog(ctx, &sync.WaitGroup{}, time.Now().UTC()) +} diff --git a/vault/activity_log_util.go b/vault/activity_log_util.go index 890af5533f..8c2585717c 100644 --- a/vault/activity_log_util.go +++ b/vault/activity_log_util.go @@ -7,9 +7,21 @@ package vault import ( "context" + + "github.com/hashicorp/vault/vault/activity" ) // sendCurrentFragment is a no-op on OSS func (a *ActivityLog) sendCurrentFragment(ctx context.Context) error { return nil } + +// receiveSecondaryPreviousMonthGlobalData is a no-op on OSS +func (a *ActivityLog) receiveSecondaryPreviousMonthGlobalData(ctx context.Context, month int64, clients *activity.LogFragment) error { + return nil +} + +// sendPreviousMonthGlobalClientsWorker is a no-op on OSS +func (a *ActivityLog) sendPreviousMonthGlobalClientsWorker(ctx context.Context) (map[int64][]*activity.EntityRecord, error) { + return map[int64][]*activity.EntityRecord{}, nil +} diff --git a/vault/activity_log_util_common.go b/vault/activity_log_util_common.go index 86c824adeb..8b1bea9bcb 100644 --- a/vault/activity_log_util_common.go +++ b/vault/activity_log_util_common.go @@ -10,6 +10,7 @@ import ( "io" "slices" "sort" + "strconv" "strings" "time" @@ -565,32 +566,25 @@ func (a *ActivityLog) extractLocalGlobalClientsDeprecatedStoragePath(ctx context return clusterLocalClients, clusterGlobalClients, fmt.Errorf("could not list available logs on the cluster") } for _, time := range times { - entityPath := activityEntityBasePath + fmt.Sprint(time.Unix()) + "/" - segmentPaths, err := a.view.List(ctx, entityPath) + segments, err := a.getAllEntitySegmentsForMonth(ctx, activityEntityBasePath, time.Unix()) if err != nil { return nil, nil, err } - for _, seqNumber := range segmentPaths { - segment, err := a.readEntitySegmentAtPath(ctx, entityPath+seqNumber) - if segment == nil { - continue - } - if err != nil { - a.logger.Warn("failed to read segment", "error", err) - return clusterLocalClients, clusterGlobalClients, err - } + for _, segment := range segments { for _, entity := range segment.GetClients() { // If the client is not local, then add it to a map + // Normalize month value to the beginning of the month to avoid multiple storage entries for the same month + startOfMonth := timeutil.StartOfMonth(time.UTC()) if local, _ := a.isClientLocal(entity); !local { - if _, ok := clusterGlobalClients[time.Unix()]; !ok { - clusterGlobalClients[time.Unix()] = make([]*activity.EntityRecord, 0) + if _, ok := clusterGlobalClients[startOfMonth.Unix()]; !ok { + clusterGlobalClients[startOfMonth.Unix()] = make([]*activity.EntityRecord, 0) } - clusterGlobalClients[time.Unix()] = append(clusterGlobalClients[time.Unix()], entity) + clusterGlobalClients[startOfMonth.Unix()] = append(clusterGlobalClients[startOfMonth.Unix()], entity) } else { - if _, ok := clusterLocalClients[time.Unix()]; !ok { - clusterLocalClients[time.Unix()] = make([]*activity.EntityRecord, 0) + if _, ok := clusterLocalClients[startOfMonth.Unix()]; !ok { + clusterLocalClients[startOfMonth.Unix()] = make([]*activity.EntityRecord, 0) } - clusterLocalClients[time.Unix()] = append(clusterLocalClients[time.Unix()], entity) + clusterLocalClients[startOfMonth.Unix()] = append(clusterLocalClients[startOfMonth.Unix()], entity) } } } @@ -627,6 +621,25 @@ func (a *ActivityLog) extractTokensDeprecatedStoragePath(ctx context.Context) (m return tokensByMonth, nil } +func (a *ActivityLog) getAllEntitySegmentsForMonth(ctx context.Context, path string, time int64) ([]*activity.EntityActivityLog, error) { + entityPathWithTime := fmt.Sprintf("%s%d/", path, time) + segments := make([]*activity.EntityActivityLog, 0) + segmentPaths, err := a.view.List(ctx, entityPathWithTime) + if err != nil { + return segments, err + } + for _, seqNum := range segmentPaths { + segment, err := a.readEntitySegmentAtPath(ctx, entityPathWithTime+seqNum) + if err != nil { + return segments, err + } + if segment != nil { + segments = append(segments, segment) + } + } + return segments, nil +} + // OldestVersionHasDeduplicatedClients returns whether this cluster is 1.19+, and // hence supports deduplicated clients func (a *ActivityLog) OldestVersionHasDeduplicatedClients(ctx context.Context) bool { @@ -648,3 +661,25 @@ func (a *ActivityLog) OldestVersionHasDeduplicatedClients(ctx context.Context) b } return oldestVersionIsDedupClients } + +func (a *ActivityLog) loadClientDataIntoSegment(ctx context.Context, pathPrefix string, startTime time.Time, seqNum uint64, currentSegment *segmentInfo) ([]*activity.EntityRecord, error) { + path := pathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(seqNum, 10) + out, err := a.readEntitySegmentAtPath(ctx, path) + if err != nil && !errors.Is(err, ErrEmptyResponse) { + return nil, err + } + if out != nil { + if !a.core.perfStandby { + a.logger.Debug(fmt.Sprintf("loading client data from %s into segment", path)) + currentSegment.startTimestamp = startTime.Unix() + currentSegment.currentClients = &activity.EntityActivityLog{Clients: out.Clients} + currentSegment.clientSequenceNumber = seqNum + + } else { + // populate this for edge case checking (if end of month passes while background loading on standby) + currentSegment.startTimestamp = startTime.Unix() + } + return out.GetClients(), nil + } + return []*activity.EntityRecord{}, nil +} diff --git a/vault/activity_log_util_common_test.go b/vault/activity_log_util_common_test.go index 2d0a0c4cee..4825899724 100644 --- a/vault/activity_log_util_common_test.go +++ b/vault/activity_log_util_common_test.go @@ -991,6 +991,22 @@ func Test_ActivityLog_ComputeCurrentMonth_NamespaceMounts(t *testing.T) { } } +// writeOldEntityPathSegment writes a single segment to the old storage path with the given time and index for an entity +func writeOldEntityPathSegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) { + t.Helper() + protoItem, err := proto.Marshal(item) + require.NoError(t, err) + WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, ts, index), protoItem) +} + +// writeSecondaryClusterSegment writes a single secondary global segment file with the given time and index for an entity +func writeSecondaryClusterSegment(t *testing.T, core *Core, ts time.Time, index int, clusterId string, item *activity.EntityActivityLog) { + t.Helper() + protoItem, err := proto.Marshal(item) + require.NoError(t, err) + WriteToStorage(t, core, makeSegmentPath(t, fmt.Sprintf("%s%s/%s", activitySecondaryTempDataPathPrefix, clusterId, activityEntityBasePath), ts, index), protoItem) +} + // writeGlobalEntitySegment writes a single global segment file with the given time and index for an entity func writeGlobalEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) { t.Helper()