Send Global Data From Secondary to Primary During Upgrade (#29137)

* OSS Patch

OSS Patch

Fixing a build issue

* Revert "OSS Patch"

This reverts commit 2cce608b9e7ad7df64cb10f91208c142e6825c57.

* OSS-Patch

* Fix test issue
This commit is contained in:
divyaac
2024-12-10 11:54:07 -08:00
committed by GitHub
parent 42ca69628e
commit 537fc0f3ea
6 changed files with 708 additions and 448 deletions

View File

@@ -45,9 +45,11 @@ const (
activityIntentLogKey = "endofmonth"
activityGlobalPathPrefix = "global/"
activityLocalPathPrefix = "local/"
activitySecondaryTempDataPathPrefix = "secondary/"
activityACMERegenerationKey = "acme-regeneration"
activityDeduplicationUpgradeKey = "deduplication-upgrade"
activitySecondaryDataRecCount = "secondary-data-received"
// sketch for each month that stores hash of client ids
distinctClientsBasePath = "log/distinctclients/"
@@ -202,8 +204,6 @@ type ActivityLog struct {
// Channel to signal global clients have received by the primary from the secondary, during upgrade to 1.19
dedupUpgradeGlobalClientsReceivedCh chan struct{}
// track whether the current cluster is in the middle of an upgrade to 1.19
dedupClientsUpgradeComplete *atomic.Bool
// track metadata and contents of the most recent log segment
currentSegment segmentInfo
@@ -237,9 +237,6 @@ type ActivityLog struct {
// This channel is relevant for upgrades to 1.17. It indicates whether precomputed queries have been
// generated for ACME clients.
computationWorkerDone chan struct{}
// This channel is relevant for upgrades to 1.19+ (version with deduplication of clients)
// This indicates that paths that were used before 1.19 to store clients have been cleaned
oldStoragePathsCleaned chan struct{}
// channel to indicate that a global clients have been
// sent to the primary from a secondary
@@ -256,6 +253,9 @@ type ActivityLog struct {
globalPartialMonthClientTracker map[string]*activity.EntityRecord
inprocessExport *atomic.Bool
// RetryUntilFalse is a test only attribute that allows us to run the sendPreviousMonthGlobalClientsWorker
// for as long as the test wants
RetryUntilFalse *atomic.Bool
// clock is used to support manipulating time in unit and integration tests
clock timeutil.Clock
@@ -427,8 +427,8 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0),
secondaryGlobalClientFragments: make([]*activity.LogFragment, 0),
inprocessExport: atomic.NewBool(false),
RetryUntilFalse: atomic.NewBool(false),
precomputedQueryWritten: make(chan struct{}),
dedupClientsUpgradeComplete: atomic.NewBool(false),
}
config, err := a.loadConfigOrDefault(core.activeContext)
@@ -497,18 +497,14 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
{"type", "client"},
})
if a.hasDedupClientsUpgrade(ctx) {
// Since we are the primary, store global clients
// Create fragments from global clients and store the segment
if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil {
return ret
}
} else if !a.dedupClientsUpgradeComplete.Load() {
// We are the secondary, and an upgrade is in progress. In this case we will temporarily store the data at this old path
// This data will be garbage collected after the upgrade has completed
if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentSegment, force, ""); ret != nil {
return ret
}
}
// If segment start time is zero, do not update or write
@@ -540,8 +536,17 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
})
}
allLocalFragments := append(standbyLocalFragments, localFragment)
if !a.hasDedupClientsUpgrade(ctx) {
// In case an upgrade is in progress we will temporarily store the data at this old path
// This data will be garbage collected after the upgrade has completed
a.logger.Debug("upgrade to 1.19 or above is in progress. storing data at old storage path until upgrade is complete")
return a.createCurrentSegmentFromFragments(ctx, append(globalFragments, allLocalFragments...), &a.currentSegment, force, "")
}
// store local fragments
if ret := a.createCurrentSegmentFromFragments(ctx, append(standbyLocalFragments, localFragment), &a.currentLocalSegment, force, activityLocalPathPrefix); ret != nil {
if ret := a.createCurrentSegmentFromFragments(ctx, allLocalFragments, &a.currentLocalSegment, force, activityLocalPathPrefix); ret != nil {
return ret
}
@@ -635,7 +640,7 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra
return nil
}
func (a *ActivityLog) savePreviousTokenSegments(ctx context.Context, startTime int64, pathPrefix string, fragments []*activity.LogFragment) error {
func (a *ActivityLog) savePreviousTokenSegments(ctx context.Context, startTime int64, fragments []*activity.LogFragment) error {
tokenByNamespace := make(map[string]uint64)
for _, fragment := range fragments {
// As of 1.9, a fragment should no longer have any NonEntityTokens. However
@@ -660,7 +665,7 @@ func (a *ActivityLog) savePreviousTokenSegments(ctx context.Context, startTime i
tokenCount: &activity.TokenCount{CountByNamespaceID: tokenByNamespace},
}
if _, err := a.saveSegmentEntitiesInternal(ctx, segmentToStore, false, pathPrefix); err != nil {
if _, err := a.saveSegmentTokensInternal(ctx, segmentToStore, false); err != nil {
return err
}
return nil
@@ -846,9 +851,9 @@ func (a *ActivityLog) availableTimesAtPath(ctx context.Context, onlyIncludeTimes
return nil, err
}
out := make([]time.Time, 0)
for _, path := range paths {
for _, pathTime := range paths {
// generate a set of unique start times
segmentTime, err := timeutil.ParseTimeFromPath(path)
segmentTime, err := timeutil.ParseTimeFromPath(pathTime)
if err != nil {
return nil, err
}
@@ -1035,56 +1040,21 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti
a.currentSegment.startTimestamp = startTime.Unix()
// load current global segment
path := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10)
out, err := a.readEntitySegmentAtPath(ctx, path)
if err != nil && !errors.Is(err, ErrEmptyResponse) {
clients, err := a.loadClientDataIntoSegment(ctx, activityGlobalPathPrefix, startTime, globalSegmentSequenceNumber, &a.currentGlobalSegment)
if err != nil {
return err
}
if out != nil {
if !a.core.perfStandby {
a.currentGlobalSegment = segmentInfo{
startTimestamp: startTime.Unix(),
currentClients: &activity.EntityActivityLog{
Clients: out.Clients,
},
tokenCount: &activity.TokenCount{
CountByNamespaceID: make(map[string]uint64),
},
clientSequenceNumber: globalSegmentSequenceNumber,
}
} else {
// populate this for edge case checking (if end of month passes while background loading on standby)
a.currentGlobalSegment.startTimestamp = startTime.Unix()
}
for _, client := range out.Clients {
a.globalPartialMonthClientTracker[client.ClientID] = client
}
for _, entity := range clients {
a.globalPartialMonthClientTracker[entity.ClientID] = entity
}
// load current local segment
path = activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(localSegmentSequenceNumber, 10)
out, err = a.readEntitySegmentAtPath(ctx, path)
if err != nil && !errors.Is(err, ErrEmptyResponse) {
clients, err = a.loadClientDataIntoSegment(ctx, activityLocalPathPrefix, startTime, localSegmentSequenceNumber, &a.currentLocalSegment)
if err != nil {
return err
}
if out != nil {
if !a.core.perfStandby {
a.currentLocalSegment = segmentInfo{
startTimestamp: startTime.Unix(),
currentClients: &activity.EntityActivityLog{
Clients: out.Clients,
},
tokenCount: a.currentLocalSegment.tokenCount,
clientSequenceNumber: localSegmentSequenceNumber,
}
} else {
// populate this for edge case checking (if end of month passes while background loading on standby)
a.currentLocalSegment.startTimestamp = startTime.Unix()
}
for _, client := range out.Clients {
a.partialMonthLocalClientTracker[client.ClientID] = client
}
for _, entity := range clients {
a.partialMonthLocalClientTracker[entity.ClientID] = entity
}
return nil
@@ -1141,7 +1111,7 @@ func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time)
// loadTokenCount populates the in-memory representation of activity token count
// this function should be called with the lock held
func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) error {
func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time, segment *segmentInfo) error {
tokenCountExists, err := a.tokenCountExists(ctx, startTime)
if err != nil {
return err
@@ -1173,7 +1143,7 @@ func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) e
// We must load the tokenCount of the current segment into the activity log
// so that TWEs counted before the introduction of a client ID for TWEs are
// still reported in the partial client counts.
a.currentLocalSegment.tokenCount = out
segment.tokenCount = out
return nil
}
@@ -1202,8 +1172,8 @@ func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitG
// Call with fragmentLock, globalFragmentLock, localFragmentLock and l held.
func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) {
a.logger.Trace("initializing new log")
a.resetCurrentLog()
a.setCurrentSegmentTimeLocked(now)
// We will normalize times to start of the month to avoid errors
a.newMonthCurrentLogLocked(now)
}
// Should be called with fragmentLock, globalFragmentLock, localFragmentLock and l held.
@@ -1239,6 +1209,10 @@ func (a *ActivityLog) setCurrentSegmentTimeLocked(t time.Time) {
func (a *ActivityLog) resetCurrentLog() {
// setting a.currentSegment timestamp to support upgrades
a.currentSegment.startTimestamp = 0
a.currentSegment.currentClients = &activity.EntityActivityLog{
Clients: make([]*activity.EntityRecord, 0),
}
a.currentSegment.clientSequenceNumber = 0
// global segment
a.currentGlobalSegment.startTimestamp = 0
@@ -1289,18 +1263,19 @@ func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64,
}
func (a *ActivityLog) deleteOldStoragePathWorker(ctx context.Context, pathPrefix string) {
pathTimes, err := a.view.List(ctx, pathPrefix)
times, err := a.availableTimesAtPath(ctx, time.Now(), pathPrefix)
if err != nil {
a.logger.Error("could not list segment paths", "error", err)
return
}
for _, pathTime := range pathTimes {
segments, err := a.view.List(ctx, pathPrefix+pathTime)
for _, pathTime := range times {
pathWithTime := fmt.Sprintf("%s%d/", pathPrefix, pathTime.Unix())
segments, err := a.view.List(ctx, pathWithTime)
if err != nil {
a.logger.Error("could not list segment path", "error", err)
}
for _, seqNum := range segments {
err = a.view.Delete(ctx, pathPrefix+pathTime+seqNum)
err = a.view.Delete(ctx, pathWithTime+seqNum)
if err != nil {
a.logger.Error("could not delete log", "error", err)
}
@@ -1335,6 +1310,19 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
a.localFragmentLock.Lock()
defer a.localFragmentLock.Unlock()
// Garbage collect data at old storage paths
if a.hasDedupClientsUpgrade(ctx) {
a.deleteOldStoragePathWorker(ctx, activityEntityBasePath)
a.deleteOldStoragePathWorker(ctx, activityTokenBasePath)
secondaryIds, err := a.view.List(ctx, activitySecondaryTempDataPathPrefix)
if err != nil {
return err
}
for _, secondaryId := range secondaryIds {
a.deleteOldStoragePathWorker(ctx, activitySecondaryTempDataPathPrefix+secondaryId+activityEntityBasePath)
}
}
decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx, now)
if err != nil {
return err
@@ -1349,7 +1337,35 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
a.startNewCurrentLogLocked(now)
}
}
}
// If we have not finished upgrading, we will refresh currentSegment so data
// can be stored at the old paths until the upgrade is complete.
if !a.hasDedupClientsUpgrade(ctx) && !a.core.perfStandby {
times, err := a.availableTimesAtPath(ctx, now, activityEntityBasePath)
if err != nil {
return err
}
if len(times) > 0 {
mostRecentTimeOldEntityPath := times[len(times)-1]
// The most recent time is either the current month or the next month (if we missed the rotation perhaps)
if timeutil.IsCurrentMonth(mostRecentTimeOldEntityPath, now) {
// setting a.currentSegment timestamp to support upgrades
a.currentSegment.startTimestamp = mostRecentTimeOldEntityPath.Unix()
// This follows the logic in loadCurrentClientSegment
// We do not want need to set a clientSeq number of perf nodes because no client data is written on perf nodes, it is forwarded to the active node
if !a.core.perfStandby {
segmentNum, exists, err := a.getLastSegmentNumberByEntityPath(ctx, activityEntityBasePath+fmt.Sprint(mostRecentTimeOldEntityPath.Unix())+"/")
if err == nil && exists {
a.loadClientDataIntoSegment(ctx, "", mostRecentTimeOldEntityPath, segmentNum, &a.currentSegment)
}
}
}
}
}
// We can exit before doing any further refreshing if we are in the middle of an upgrade or there are no logs
if len(decreasingLogTimes) == 0 || !a.hasDedupClientsUpgrade(ctx) {
return nil
}
@@ -1395,7 +1411,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
// is still required since without it, we would lose replicated TWE counts for the
// current segment.
if !a.core.perfStandby {
err = a.loadTokenCount(ctx, mostRecent)
err = a.loadTokenCount(ctx, mostRecent, &a.currentLocalSegment)
if err != nil {
return err
}
@@ -1665,17 +1681,21 @@ func (c *Core) secondaryDuplicateClientMigrationWorker(ctx context.Context) {
manager := c.activityLog
manager.logger.Trace("started secondary activity log migration worker")
storageMigrationComplete := atomic.NewBool(false)
globalClientDataSent := atomic.NewBool(false)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
if !c.IsPerfSecondary() {
// TODO: Create function for the secondary to continuously attempt to send data to the primary
defer wg.Done()
_, err := manager.sendPreviousMonthGlobalClientsWorker(ctx)
if err != nil {
manager.logger.Debug("failed to send previous months client data to primary", "error", err)
return
}
wg.Done()
globalClientDataSent.Store(true)
}()
wg.Add(1)
go func() {
defer wg.Done()
localClients, _, err := manager.extractLocalGlobalClientsDeprecatedStoragePath(ctx)
if err != nil {
return
@@ -1690,31 +1710,46 @@ func (c *Core) secondaryDuplicateClientMigrationWorker(ctx context.Context) {
return
}
}
// Get tokens from previous months at old storage paths
clusterTokens, err := manager.extractTokensDeprecatedStoragePath(ctx)
// Store tokens at new path
for month, tokenCount := range clusterTokens {
// Combine all token counts from all clusters
logFragments := make([]*activity.LogFragment, len(tokenCount))
for i, tokens := range tokenCount {
logFragments[i] = &activity.LogFragment{NonEntityTokens: tokens}
}
if err = manager.savePreviousTokenSegments(ctx, month, logFragments); err != nil {
manager.logger.Error("failed to write token segment", "error", err, "month", month)
return
}
}
storageMigrationComplete.Store(true)
// TODO: generate/store PCQs for these local clients
wg.Done()
}()
wg.Wait()
if !storageMigrationComplete.Load() {
manager.logger.Error("could not complete migration of duplicate clients on cluster")
return
}
if !globalClientDataSent.Load() {
manager.logger.Error("could not send global clients to the primary")
return
}
// We have completed the vital portions of the storage migration
if err := manager.writeDedupClientsUpgrade(ctx); err != nil {
manager.logger.Error("could not complete migration of duplicate clients on cluster")
return
}
// Now that all the clients have been migrated and PCQs have been created, remove all clients at old storage paths
manager.oldStoragePathsCleaned = make(chan struct{})
go func() {
defer close(manager.oldStoragePathsCleaned)
manager.deleteOldStoragePathWorker(ctx, activityEntityBasePath)
manager.deleteOldStoragePathWorker(ctx, activityTokenBasePath)
// TODO: Delete old PCQs
}()
manager.dedupClientsUpgradeComplete.Store(true)
// Refresh activity log and load current month entities into memory
manager.refreshFromStoredLog(ctx, wg, time.Now().UTC())
manager.logger.Trace("completed secondary activity log migration worker")
}
@@ -1752,6 +1787,31 @@ func (a *ActivityLog) writeDedupClientsUpgrade(ctx context.Context) error {
return a.view.Put(ctx, regeneratedEntry)
}
func (a *ActivityLog) incrementSecondaryClientRecCount(ctx context.Context) error {
val, _ := a.getSecondaryClientRecCount(ctx)
val += 1
regeneratedEntry, err := logical.StorageEntryJSON(activitySecondaryDataRecCount, val)
if err != nil {
return err
}
return a.view.Put(ctx, regeneratedEntry)
}
func (a *ActivityLog) getSecondaryClientRecCount(ctx context.Context) (int, error) {
out, err := a.view.Get(ctx, activitySecondaryDataRecCount)
if err != nil {
return 0, err
}
if out == nil {
return 0, nil
}
var data int
if err = out.DecodeJSON(&data); err != nil {
return 0, err
}
return data, err
}
func (a *ActivityLog) regeneratePrecomputedQueries(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -1920,7 +1980,7 @@ func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) {
}
// Only send data if no upgrade is in progress. Else, the active worker will
// store the data in a temporary location until it is garbage collected
if a.dedupClientsUpgradeComplete.Load() {
if a.hasDedupClientsUpgrade(ctx) {
sendFunc()
}
@@ -1935,7 +1995,7 @@ func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) {
}
// If an upgrade is in progress, don't do anything
// The active fragmentWorker will take care of flushing the clients to a temporary location
if a.dedupClientsUpgradeComplete.Load() {
if a.hasDedupClientsUpgrade(ctx) {
sendFunc()
// clear active entity set
a.globalFragmentLock.Lock()
@@ -4037,7 +4097,6 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) {
} else {
// Store that upgrade processes have already been completed
manager.writeDedupClientsUpgrade(ctx)
manager.dedupClientsUpgradeComplete.Store(true)
}
} else {
// We kick off the secondary migration worker in any chance that the primary has not yet upgraded.
@@ -4045,11 +4104,6 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) {
// already upgraded primary
if !manager.hasDedupClientsUpgrade(ctx) {
go c.secondaryDuplicateClientMigrationWorker(ctx)
} else {
// Store that upgrade processes have already been completed
manager.writeDedupClientsUpgrade(ctx)
manager.dedupClientsUpgradeComplete.Store(true)
}
}
}
@@ -4062,10 +4116,11 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) {
func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error {
a := c.activityLog
a.logger.Trace("started primary activity log migration worker")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Collect global clients from secondary
err := a.waitForSecondaryGlobalClients(ctx)
if err != nil {
if err := a.waitForSecondaryGlobalClients(ctx); err != nil {
return err
}
@@ -4077,8 +4132,36 @@ func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error
}
// Get tokens from previous months at old storage paths
clusterTokens, err := a.extractTokensDeprecatedStoragePath(ctx)
if err != nil {
return nil
}
// TODO: Collect clients from secondaries into slice of fragments
// Collect global clients from secondaries and put them in the clusterGlobalClients map
secondaryIds, err := a.view.List(ctx, activitySecondaryTempDataPathPrefix)
if err != nil {
return err
}
for _, secondaryId := range secondaryIds {
times, err := a.availableTimesAtPath(ctx, time.Now(), activitySecondaryTempDataPathPrefix+secondaryId+activityEntityBasePath)
if err != nil {
a.logger.Error("could not list secondary cluster clients until for cluster", "cluster", secondaryId)
return err
}
for _, time := range times {
segments, err := a.getAllEntitySegmentsForMonth(ctx, activitySecondaryTempDataPathPrefix+secondaryId+activityEntityBasePath, time.Unix())
if err != nil {
return err
}
for _, segment := range segments {
for _, entity := range segment.GetClients() {
if _, ok := clusterGlobalClients[time.Unix()]; !ok {
clusterGlobalClients[time.Unix()] = make([]*activity.EntityRecord, 0)
}
clusterGlobalClients[time.Unix()] = append(clusterGlobalClients[time.Unix()], entity)
}
}
}
}
// Store global clients at new path
for month, entitiesForMonth := range clusterGlobalClients {
@@ -4107,7 +4190,7 @@ func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error
for i, tokens := range tokenCount {
logFragments[i] = &activity.LogFragment{NonEntityTokens: tokens}
}
if err = a.savePreviousTokenSegments(ctx, month, activityLocalPathPrefix+activityTokenBasePath, logFragments); err != nil {
if err = a.savePreviousTokenSegments(ctx, month, logFragments); err != nil {
a.logger.Error("failed to write token segment", "error", err, "month", month)
return err
}
@@ -4119,15 +4202,12 @@ func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error
a.logger.Error("could not complete migration of duplicate clients on cluster")
return err
}
// Garbage collect data at old paths
a.oldStoragePathsCleaned = make(chan struct{})
go func() {
defer close(a.oldStoragePathsCleaned)
a.deleteOldStoragePathWorker(ctx, activityEntityBasePath)
a.deleteOldStoragePathWorker(ctx, activityTokenBasePath)
// We will also need to delete old PCQs
}()
a.dedupClientsUpgradeComplete.Store(true)
// TODO: We will also need to delete old PCQs
// Refresh activity log and load current month entities into memory
a.refreshFromStoredLog(ctx, &sync.WaitGroup{}, time.Now().UTC())
a.logger.Trace("completed primary activity log migration worker")
return nil
}

View File

@@ -12,9 +12,7 @@ import (
"io"
"net/http"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
@@ -1372,69 +1370,6 @@ func TestActivityLog_tokenCountExists(t *testing.T) {
}
}
// entityRecordsEqual compares the parts we care about from two activity entity record slices
// note: this makes a copy of the []*activity.EntityRecord so that misordered slices won't fail the comparison,
// but the function won't modify the order of the slices to compare
func entityRecordsEqual(t *testing.T, record1, record2 []*activity.EntityRecord) bool {
t.Helper()
if record1 == nil {
return record2 == nil
}
if record2 == nil {
return record1 == nil
}
if len(record1) != len(record2) {
return false
}
// sort first on namespace, then on ID, then on timestamp
entityLessFn := func(e []*activity.EntityRecord, i, j int) bool {
ei := e[i]
ej := e[j]
nsComp := strings.Compare(ei.NamespaceID, ej.NamespaceID)
if nsComp == -1 {
return true
}
if nsComp == 1 {
return false
}
idComp := strings.Compare(ei.ClientID, ej.ClientID)
if idComp == -1 {
return true
}
if idComp == 1 {
return false
}
return ei.Timestamp < ej.Timestamp
}
entitiesCopy1 := make([]*activity.EntityRecord, len(record1))
entitiesCopy2 := make([]*activity.EntityRecord, len(record2))
copy(entitiesCopy1, record1)
copy(entitiesCopy2, record2)
sort.Slice(entitiesCopy1, func(i, j int) bool {
return entityLessFn(entitiesCopy1, i, j)
})
sort.Slice(entitiesCopy2, func(i, j int) bool {
return entityLessFn(entitiesCopy2, i, j)
})
for i, a := range entitiesCopy1 {
b := entitiesCopy2[i]
if a.ClientID != b.ClientID || a.NamespaceID != b.NamespaceID || a.Timestamp != b.Timestamp {
return false
}
}
return true
}
func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
t.Helper()
@@ -1586,7 +1521,7 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) {
}
currentGlobalEntities := a.GetCurrentGlobalEntities()
if !entityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) {
if !EntityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) {
t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentGlobalEntities, tc.path)
}
@@ -1742,7 +1677,7 @@ func TestActivityLog_loadTokenCount(t *testing.T) {
}
for _, tc := range testCases {
err := a.loadTokenCount(ctx, time.Unix(tc.time, 0))
err := a.loadTokenCount(ctx, time.Unix(tc.time, 0), &a.currentLocalSegment)
if err != nil {
t.Fatalf("got error loading data for %q: %v", tc.path, err)
}
@@ -1810,13 +1745,99 @@ func TestActivityLog_StopAndRestart(t *testing.T) {
}
}
func addActivityRecordsOldStoragePath(t *testing.T, core *Core, base time.Time, includeEntities, includeTokens bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) {
t.Helper()
monthsAgo := base.AddDate(0, -3, 0)
a := core.activityLog
var entityRecords []*activity.EntityRecord
if includeEntities {
entityRecords = []*activity.EntityRecord{
{
ClientID: "11111111-1111-1111-1111-111111111111",
NamespaceID: namespace.RootNamespaceID,
Timestamp: time.Now().Unix(),
},
{
ClientID: "22222222-2222-2222-2222-222222222222",
NamespaceID: namespace.RootNamespaceID,
Timestamp: time.Now().Unix(),
},
{
ClientID: "33333333-2222-2222-2222-222222222222",
NamespaceID: namespace.RootNamespaceID,
Timestamp: time.Now().Unix(),
},
}
if constants.IsEnterprise {
entityRecords = append(entityRecords, []*activity.EntityRecord{
{
ClientID: "44444444-1111-1111-1111-111111111111",
NamespaceID: "ns1",
Timestamp: time.Now().Unix(),
},
}...)
}
// append some local entity data
entityRecords = append(entityRecords, &activity.EntityRecord{
ClientID: "44444444-4444-4444-4444-444444444444",
NamespaceID: namespace.RootNamespaceID,
Timestamp: time.Now().Unix(),
})
for i, entityRecord := range entityRecords {
entityData, err := proto.Marshal(&activity.EntityActivityLog{
Clients: []*activity.EntityRecord{entityRecord},
})
if err != nil {
t.Fatalf(err.Error())
}
switch i {
case 0:
WriteToStorage(t, core, ActivityPrefix+activityEntityBasePath+fmt.Sprint(monthsAgo.Unix())+"/0", entityData)
case len(entityRecords) - 1:
// local data
WriteToStorage(t, core, ActivityPrefix+activityEntityBasePath+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData)
default:
WriteToStorage(t, core, ActivityPrefix+activityEntityBasePath+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData)
}
}
}
var tokenRecords map[string]uint64
if includeTokens {
tokenRecords = make(map[string]uint64)
tokenRecords[namespace.RootNamespaceID] = uint64(1)
if constants.IsEnterprise {
for i := 1; i < 4; i++ {
nsID := "ns" + strconv.Itoa(i)
tokenRecords[nsID] = uint64(i)
}
}
tokenCount := &activity.TokenCount{
CountByNamespaceID: tokenRecords,
}
tokenData, err := proto.Marshal(tokenCount)
if err != nil {
t.Fatalf(err.Error())
}
WriteToStorage(t, core, ActivityPrefix+activityTokenBasePath+fmt.Sprint(base.Unix())+"/0", tokenData)
}
return a, entityRecords, tokenRecords
}
// :base: is the timestamp to start from for the setup logic (use to simulate newest log from past or future)
// entity records returned include [0] data from a previous month and [1:] data from the current month
// token counts returned are from the current month
func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities, includeTokens bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) {
func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities, includeTokens, addOldStoragePathData bool) (*ActivityLog, []*activity.EntityRecord, map[string]uint64) {
t.Helper()
core, _, _ := TestCoreUnsealed(t)
core, _, _ := TestCoreUnsealedWithConfig(t, &CoreConfig{ActivityLogConfig: ActivityLogCoreConfig{ForceEnable: true}})
a := core.activityLog
monthsAgo := base.AddDate(0, -3, 0)
@@ -1898,13 +1919,17 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
WriteToStorage(t, core, ActivityLogLocalPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData)
}
if addOldStoragePathData {
return addActivityRecordsOldStoragePath(t, core, base, includeEntities, includeTokens)
}
return a, entityRecords, tokenRecords
}
// TestActivityLog_refreshFromStoredLog writes records for 3 months ago and this month, then calls refreshFromStoredLog.
// TestActivityLog_refreshFromStoredLog_DedupUpgradeComplete writes records for 3 months ago and this month, then calls refreshFromStoredLog.
// The system believes the upgrade to 1.19+ is already complete. It should not refresh data from old storage paths, only data at the new storage paths.
// The test verifies that current entities and current tokens are correct.
func TestActivityLog_refreshFromStoredLog(t *testing.T) {
a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
func TestActivityLog_refreshFromStoredLog_DedupUpgradeComplete(t *testing.T) {
a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true, true)
a.SetEnable(true)
var wg sync.WaitGroup
@@ -1933,13 +1958,101 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
}
currentEntities := a.GetCurrentGlobalEntities()
if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) {
if !EntityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) {
// we only expect the newest entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
}
currentLocalEntities := a.GetCurrentLocalEntities()
if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
if !EntityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
// we only expect the newest local entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities)
}
nsCount := a.GetStoredTokenCountByNamespaceID()
require.Equal(t, nsCount, expectedTokenCounts)
activeClients := a.core.GetActiveClientsList()
if err := ActiveEntitiesEqual(activeClients, expectedActive.Clients); err != nil {
// we expect activeClients to be loaded for the entire month
t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v: %v", expectedActive.Clients, activeClients, err)
}
// verify active global clients list
activeGlobalClients := a.core.GetActiveGlobalClientsList()
if err := ActiveEntitiesEqual(activeGlobalClients, expectedActiveGlobal.Clients); err != nil {
// we expect activeClients to be loaded for the entire month
t.Errorf("bad data loaded into active global entities. expected only set of EntityID from %v in %v: %v", expectedActiveGlobal.Clients, activeGlobalClients, err)
}
// verify active local clients list
activeLocalClients := a.core.GetActiveLocalClientsList()
if err := ActiveEntitiesEqual(activeLocalClients, expectedCurrentLocal.Clients); err != nil {
// we expect activeClients to be loaded for the entire month
t.Errorf("bad data loaded into active local entities. expected only set of EntityID from %v in %v: %v", expectedCurrentLocal.Clients, activeLocalClients, err)
}
// No data from the old storage paths should have been loaded because the system believes that the upgrade was already complete
a.ExpectOldSegmentRefreshed(t, time.Now().UTC().Unix(), false, []*activity.EntityRecord{}, map[string]uint64{})
}
// TestActivityLog_refreshFromStoredLog_DedupUpgradeIncomplete writes records for 3 months ago and this month, then calls refreshFromStoredLog.
// The system thinks the upgrade to 1.19+ is incomplete. It should not refresh data from new storage paths, only data at the old storage paths.
// The test verifies that current entities and current tokens are correct.
func TestActivityLog_refreshFromStoredLog_DedupUpgradeIncomplete(t *testing.T) {
a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true, true)
a.SetEnable(true)
// Reset the system to state where the upgrade is incomplete
a.ResetDedupUpgrade(context.Background())
var wg sync.WaitGroup
now := time.Now().UTC()
err := a.refreshFromStoredLog(context.Background(), &wg, now)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
// active clients for the entire month
expectedActive := &activity.EntityActivityLog{
Clients: expectedClientRecords[1:],
}
// global clients added to the newest local entity segment
expectedCurrent := &activity.EntityActivityLog{
Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1],
}
expectedActiveGlobal := &activity.EntityActivityLog{
Clients: expectedClientRecords[1 : len(expectedClientRecords)-1],
}
// local client is only added to the newest segment for the current month. This should also appear in the active clients for the entire month.
expectedCurrentLocal := &activity.EntityActivityLog{
Clients: expectedClientRecords[len(expectedClientRecords)-1:],
}
// Data should be loaded into the old segment
a.ExpectOldSegmentRefreshed(t, now.Unix(), false, expectedCurrentLocal.GetClients(), map[string]uint64{})
a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false)
// Simulate the completion of an upgrade
a.writeDedupClientsUpgrade(context.Background())
err = a.refreshFromStoredLog(context.Background(), &wg, now)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
currentEntities := a.GetCurrentGlobalEntities()
if !EntityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) {
// we only expect the newest entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
}
currentLocalEntities := a.GetCurrentLocalEntities()
if !EntityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
// we only expect the newest local entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities)
}
@@ -1974,7 +2087,7 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
// test closes a.doneCh and calls refreshFromStoredLog, which will not do any processing because the doneCh is closed.
// The test verifies that the current data is not loaded.
func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testing.T) {
a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true, false)
a.SetEnable(true)
var wg sync.WaitGroup
@@ -2007,13 +2120,13 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi
}
currentEntities := a.GetCurrentGlobalEntities()
if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) {
if !EntityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) {
// we only expect the newest entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
}
currentLocalEntities := a.GetCurrentLocalEntities()
if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
if !EntityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
// we only expect the newest local entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities)
}
@@ -2046,7 +2159,7 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi
// TestActivityLog_refreshFromStoredLogContextCancelled writes data from 3 months ago to this month and calls
// refreshFromStoredLog with a canceled context, verifying that the function errors because of the canceled context.
func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) {
a, _, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
a, _, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true, false)
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(context.Background())
@@ -2061,7 +2174,7 @@ func TestActivityLog_refreshFromStoredLogContextCancelled(t *testing.T) {
// TestActivityLog_refreshFromStoredLogNoTokens writes only entities from 3 months ago to today, then calls
// refreshFromStoredLog. It verifies that there are no tokens loaded.
func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
a, expectedClientRecords, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, false)
a, expectedClientRecords, _ := setupActivityRecordsInStorage(t, time.Now().UTC(), true, false, false)
a.SetEnable(true)
var wg sync.WaitGroup
@@ -2082,13 +2195,13 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
}
currentGlobalEntities := a.GetCurrentGlobalEntities()
if !entityRecordsEqual(t, currentGlobalEntities.Clients, expectedCurrentGlobal.Clients) {
if !EntityRecordsEqual(t, currentGlobalEntities.Clients, expectedCurrentGlobal.Clients) {
// we only expect the newest entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentGlobal, currentGlobalEntities)
}
currentLocalEntities := a.GetCurrentLocalEntities()
if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
if !EntityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
// we only expect the newest local entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities)
}
@@ -2108,7 +2221,7 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
// TestActivityLog_refreshFromStoredLogNoEntities writes only direct tokens from 3 months ago to today, and runs
// refreshFromStoredLog. It verifies that there are no entities or clients loaded.
func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) {
a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), false, true)
a, _, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), false, true, false)
a.SetEnable(true)
var wg sync.WaitGroup
@@ -2138,17 +2251,29 @@ func TestActivityLog_refreshFromStoredLogNoEntities(t *testing.T) {
// current segment counts are zero.
func TestActivityLog_refreshFromStoredLogNoData(t *testing.T) {
now := time.Now().UTC()
a, _, _ := setupActivityRecordsInStorage(t, now, false, false)
a, _, _ := setupActivityRecordsInStorage(t, now, false, false, true)
a.SetEnable(true)
// Simulate an upgrade that is incomplete
a.ResetDedupUpgrade(context.Background())
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg, now)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
a.ExpectOldSegmentRefreshed(t, timeutil.StartOfMonth(now).Unix(), false, []*activity.EntityRecord{}, map[string]uint64{})
a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false)
a.ExpectCurrentSegmentRefreshed(t, now.Unix(), false)
// Simulate an upgrade that is complete
require.NoError(t, a.writeDedupClientsUpgrade(context.Background()))
err = a.refreshFromStoredLog(context.Background(), &wg, now)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
a.ExpectOldSegmentRefreshed(t, timeutil.StartOfMonth(now).Unix(), false, []*activity.EntityRecord{}, map[string]uint64{})
a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false)
}
// TestActivityLog_refreshFromStoredLogTwoMonthsPrevious creates segment data from 5 months ago to 2 months ago and
@@ -2157,17 +2282,29 @@ func TestActivityLog_refreshFromStoredLogTwoMonthsPrevious(t *testing.T) {
// test what happens when the most recent data is from month M-2 (or earlier - same effect)
now := time.Now().UTC()
twoMonthsAgoStart := timeutil.StartOfPreviousMonth(timeutil.StartOfPreviousMonth(now))
a, _, _ := setupActivityRecordsInStorage(t, twoMonthsAgoStart, true, true)
a, _, _ := setupActivityRecordsInStorage(t, twoMonthsAgoStart, true, true, true)
a.SetEnable(true)
// Simulate an upgrade that is incomplete
a.ResetDedupUpgrade(context.Background())
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg, now)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false)
a.ExpectOldSegmentRefreshed(t, timeutil.StartOfMonth(now).Unix(), false, []*activity.EntityRecord{}, map[string]uint64{})
a.ExpectCurrentSegmentRefreshed(t, now.Unix(), false)
// Simulate an upgrade that is complete
a.writeDedupClientsUpgrade(context.Background())
err = a.refreshFromStoredLog(context.Background(), &wg, now)
if err != nil {
t.Fatalf("got error loading stored activity logs: %v", err)
}
wg.Wait()
a.ExpectCurrentSegmentsRefreshed(t, timeutil.StartOfMonth(now).Unix(), false)
a.ExpectOldSegmentRefreshed(t, timeutil.StartOfMonth(now).Unix(), false, []*activity.EntityRecord{}, map[string]uint64{})
}
// TestActivityLog_refreshFromStoredLogPreviousMonth creates segment data from 4 months ago to 1 month ago, then calls
@@ -2178,9 +2315,12 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
// can handle end of month rotations
monthStart := timeutil.StartOfMonth(time.Now().UTC())
oneMonthAgoStart := timeutil.StartOfPreviousMonth(monthStart)
a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, oneMonthAgoStart, true, true)
a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, oneMonthAgoStart, true, true, true)
a.SetEnable(true)
// Reset upgrade attributes to simulate startup
a.ResetDedupUpgrade(context.Background())
var wg sync.WaitGroup
err := a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC())
if err != nil {
@@ -2188,6 +2328,18 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
}
wg.Wait()
// Previous month data should not be loaded into the currentSegment
a.ExpectOldSegmentRefreshed(t, monthStart.Unix(), false, []*activity.EntityRecord{}, map[string]uint64{})
a.ExpectCurrentSegmentsRefreshed(t, monthStart.Unix(), false)
// Simulate completion of upgrade
require.NoError(t, a.writeDedupClientsUpgrade(context.Background()))
// With a refresh after upgrade is complete, the currentGlobalSegment and currentLocalSegment should contain data
err = a.refreshFromStoredLog(context.Background(), &wg, time.Now().UTC())
require.NoError(t, err)
wg.Wait()
expectedActive := &activity.EntityActivityLog{
Clients: expectedClientRecords[1:],
}
@@ -2196,16 +2348,13 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
}
currentEntities := a.GetCurrentGlobalEntities()
if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) {
if !EntityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) {
// we only expect the newest entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
}
nsCount := a.GetStoredTokenCountByNamespaceID()
if !reflect.DeepEqual(nsCount, expectedTokenCounts) {
// we expect all token counts to be loaded
t.Errorf("bad activity token counts loaded. expected: %v got: %v", expectedTokenCounts, nsCount)
}
require.Equal(t, expectedTokenCounts, nsCount)
activeClients := a.core.GetActiveClientsList()
if err := ActiveEntitiesEqual(activeClients, expectedActive.Clients); err != nil {
@@ -2433,7 +2582,7 @@ func TestActivityLog_EnableDisable(t *testing.T) {
}
expectMissingSegment(t, core, path)
a.ExpectCurrentSegmentRefreshed(t, 0, false)
a.ExpectCurrentSegmentsRefreshed(t, 0, false)
// enable (if not already) which force-writes an empty segment
enableRequest()
@@ -4152,7 +4301,7 @@ func TestActivityLog_partialMonthClientCount(t *testing.T) {
ctx := namespace.RootContext(nil)
now := time.Now().UTC()
a, clients, _ := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true)
a, clients, _ := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true, false)
// clients[0] belongs to previous month
clients = clients[1:]
@@ -4223,7 +4372,7 @@ func TestActivityLog_partialMonthClientCountUsingHandleQuery(t *testing.T) {
ctx := namespace.RootContext(nil)
now := time.Now().UTC()
a, clients, _ := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true)
a, clients, _ := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true, false)
// clients[0] belongs to previous month
clients = clients[1:]
@@ -5831,7 +5980,7 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) {
a.SetEnable(true)
ctx := context.Background()
timeStamp := time.Now()
timeStamp := time.Now().UTC()
startOfMonth := timeutil.StartOfMonth(timeStamp)
oneMonthAgo := timeutil.StartOfPreviousMonth(timeStamp)
twoMonthsAgo := timeutil.StartOfPreviousMonth(oneMonthAgo)
@@ -5865,13 +6014,28 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) {
a.savePreviousEntitySegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[1:], clientRecordsGlobal[1:]...)}})
a.savePreviousEntitySegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[2:], clientRecordsGlobal[2:]...)}})
// Write tokens to old path. We write twice to simulate multiple segments for each month
for i := 0; i < 2; i++ {
writeTokenSegmentOldPath(t, core, twoMonthsAgo, i, &activity.TokenCount{CountByNamespaceID: tokenCounts})
writeTokenSegmentOldPath(t, core, oneMonthAgo, i, &activity.TokenCount{CountByNamespaceID: tokenCounts})
writeTokenSegmentOldPath(t, core, startOfMonth, i, &activity.TokenCount{CountByNamespaceID: tokenCounts})
}
// Write secondary cluster data. This is to make sure that the data at these paths are garbage collected at the end of the migration routine
numSecondarySegments := 4
secondaryIds := make([]string, 0)
for i := 0; i < numSecondarySegments; i++ {
writeSecondaryClusterSegment(t, core, twoMonthsAgo, i, fmt.Sprintf("cluster_%d", i), &activity.EntityActivityLog{Clients: clientRecordsGlobal[:ActivitySegmentClientCapacity]})
writeSecondaryClusterSegment(t, core, oneMonthAgo, i, fmt.Sprintf("cluster_%d", i), &activity.EntityActivityLog{Clients: clientRecordsGlobal[1:ActivitySegmentClientCapacity]})
writeSecondaryClusterSegment(t, core, startOfMonth, i, fmt.Sprintf("cluster_%d", i), &activity.EntityActivityLog{Clients: clientRecordsGlobal[2:ActivitySegmentClientCapacity]})
secondaryIds = append(secondaryIds, fmt.Sprintf("cluster_%d", i))
}
// Assert that the migration workers have not been run
require.True(t, a.hasDedupClientsUpgrade(ctx))
require.True(t, a.dedupClientsUpgradeComplete.Load())
// Resetting this to false so that we can
// verify that after the migrations is completed, the correct values have been stored
a.dedupClientsUpgradeComplete.Store(false)
require.NoError(t, a.view.Delete(ctx, activityDeduplicationUpgradeKey))
// Forcefully run the primary migration worker
@@ -5891,6 +6055,7 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) {
require.NoError(t, err)
globalClients = append(globalClients, segment.GetClients()...)
}
// We've added duplicate clients from secondaries, so this should not affect the count of the global clients
require.Equal(t, len(clientRecordsGlobal)-index, len(globalClients))
}
@@ -5914,31 +6079,23 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) {
for _, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
numTokenSegments := 0
for {
segment, err := reader.ReadToken(ctx)
if errors.Is(err, io.EOF) {
break
}
numTokenSegments += 1
require.NoError(t, err)
// Verify that the data is correct
deep.Equal(segment.GetCountByNamespaceID(), tokenCounts)
}
// All tokens should have been combined into one segment
require.Equal(t, 1, numTokenSegments)
}
// Check that the storage key has been updated
require.True(t, a.hasDedupClientsUpgrade(ctx))
// Check that the bool has been updated
require.True(t, a.dedupClientsUpgradeComplete.Load())
// Wait for the deletion of old logs to complete
timeout := time.After(25 * time.Second)
// Wait for channel indicating deletion to be written
select {
case <-timeout:
t.Fatal("timed out waiting for deletion to complete")
case <-a.oldStoragePathsCleaned:
break
}
// Verify there is no data at the old paths
times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath)
@@ -5949,152 +6106,11 @@ func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) {
times, err = a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath)
require.NoError(t, err)
require.Equal(t, 0, len(times))
}
// TestActivityLog_SecondaryDuplicateClientMigrationWorker verifies that the secondary
// migration worker correctly moves local data from old location to the new location
func TestActivityLog_SecondaryDuplicateClientMigrationWorker(t *testing.T) {
cluster := NewTestCluster(t, nil, nil)
core := cluster.Cores[0].Core
a := core.activityLog
a.SetEnable(true)
ctx := context.Background()
timeStamp := time.Now()
startOfMonth := timeutil.StartOfMonth(timeStamp)
oneMonthAgo := timeutil.StartOfPreviousMonth(timeStamp)
twoMonthsAgo := timeutil.StartOfPreviousMonth(oneMonthAgo)
clientRecordsGlobal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1)
for i := range clientRecordsGlobal {
clientRecordsGlobal[i] = &activity.EntityRecord{
ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i),
Timestamp: timeStamp.Unix(),
NonEntity: false,
}
}
clientRecordsLocal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1)
for i := range clientRecordsGlobal {
clientRecordsLocal[i] = &activity.EntityRecord{
ClientID: fmt.Sprintf("011122222-3333-4444-5555-%012v", i),
Timestamp: timeStamp.Unix(),
// This is to trick the system into believing this a local client when parsing data
ClientType: nonEntityTokenActivityType,
}
}
tokenCounts := map[string]uint64{
"ns1": 10,
"ns2": 11,
"ns3": 12,
}
// Write global and local clients to old path
a.savePreviousEntitySegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal, clientRecordsGlobal...)}})
a.savePreviousEntitySegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[1:], clientRecordsGlobal[1:]...)}})
a.savePreviousEntitySegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[2:], clientRecordsGlobal[2:]...)}})
// Write tokens to old path
a.savePreviousTokenSegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}})
a.savePreviousTokenSegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}})
a.savePreviousTokenSegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}})
// Assert that the migration workers have not been run
require.True(t, a.hasDedupClientsUpgrade(ctx))
require.True(t, a.dedupClientsUpgradeComplete.Load())
// Resetting this to false so that we can
// verify that after the migrations is completed, the correct values have been stored
a.dedupClientsUpgradeComplete.Store(false)
require.NoError(t, a.view.Delete(ctx, activityDeduplicationUpgradeKey))
// Forcefully run the secondary migration worker
core.secondaryDuplicateClientMigrationWorker(ctx)
// Wait for the storage migration to complete
ticker := time.NewTicker(100 * time.Millisecond)
timeout := time.After(25 * time.Second)
for {
select {
case <-timeout:
t.Fatal("timed out waiting for migration to complete")
case <-ticker.C:
}
if a.dedupClientsUpgradeComplete.Load() {
break
}
}
// Verify that no global clients have been migrated
times := []time.Time{twoMonthsAgo, oneMonthAgo, startOfMonth}
for _, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
globalClients := make([]*activity.EntityRecord, 0)
for {
segment, err := reader.ReadGlobalEntity(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
globalClients = append(globalClients, segment.GetClients()...)
}
require.Equal(t, 0, len(globalClients))
}
// Verify local clients have been correctly migrated
for index, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
localClients := make([]*activity.EntityRecord, 0)
for {
segment, err := reader.ReadLocalEntity(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
localClients = append(localClients, segment.GetClients()...)
}
require.Equal(t, len(clientRecordsLocal)-index, len(localClients))
}
// Verify non-entity tokens have been correctly migrated
for _, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
for {
segment, err := reader.ReadToken(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
// Verify that the data is correct
deep.Equal(segment.GetCountByNamespaceID(), tokenCounts)
}
}
// Check that the storage key has been updated
require.True(t, a.hasDedupClientsUpgrade(ctx))
// Check that the bool has been updated
require.True(t, a.dedupClientsUpgradeComplete.Load())
// Wait for the deletion of old logs to complete
timeout = time.After(25 * time.Second)
// Wait for channel indicating deletion to be written
select {
case <-timeout:
t.Fatal("timed out waiting for deletion to complete")
case <-a.oldStoragePathsCleaned:
break
}
// Verify there is no data at the old entity paths
times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath)
require.NoError(t, err)
require.Equal(t, 0, len(times))
// Verify there is no data at the old token paths
times, err = a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath)
// Verify there is no data at the secondary cluster paths
for _, secondaryId := range secondaryIds {
times, err = a.availableTimesAtPath(ctx, time.Now(), activitySecondaryTempDataPathPrefix+secondaryId+activityEntityBasePath)
require.NoError(t, err)
require.Equal(t, 0, len(times))
}
}

View File

@@ -7,13 +7,18 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/vault/helper/constants"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
)
@@ -187,74 +192,134 @@ func RandStringBytes(n int) string {
return string(b)
}
// ExpectCurrentSegmentRefreshed verifies that the current segment has been refreshed
// non-nil empty components and updated with the `expectedStart` timestamp
// ExpectOldSegmentRefreshed verifies that the old current segment structure has been refreshed
// non-nil empty components and updated with the `expectedStart` timestamp. This is expected when
// an upgrade has not yet completed.
// Note: if `verifyTimeNotZero` is true, ignore `expectedStart` and just make sure the timestamp isn't 0
func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart int64, verifyTimeNotZero bool) {
func (a *ActivityLog) ExpectOldSegmentRefreshed(t *testing.T, expectedStart int64, verifyTimeNotZero bool, expectedEntities []*activity.EntityRecord, directTokens map[string]uint64) {
t.Helper()
a.l.RLock()
defer a.l.RUnlock()
a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock()
if a.currentGlobalSegment.currentClients == nil {
t.Fatalf("expected non-nil currentSegment.currentClients")
require.NotNil(t, a.currentSegment.currentClients)
require.NotNil(t, a.currentSegment.currentClients.Clients)
require.NotNil(t, a.currentSegment.tokenCount)
require.NotNil(t, a.currentSegment.tokenCount.CountByNamespaceID)
if !EntityRecordsEqual(t, a.currentSegment.currentClients.Clients, expectedEntities) {
// we only expect the newest entity segment to be loaded (for the current month)
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", a.currentSegment.currentClients.Clients, expectedEntities)
}
if a.currentGlobalSegment.currentClients.Clients == nil {
t.Errorf("expected non-nil currentSegment.currentClients.Entities")
require.Equal(t, directTokens, a.currentSegment.tokenCount.CountByNamespaceID)
if verifyTimeNotZero {
require.NotEqual(t, a.currentSegment.startTimestamp, 0)
} else {
require.Equal(t, a.currentSegment.startTimestamp, expectedStart)
}
if a.currentGlobalSegment.tokenCount == nil {
t.Fatalf("expected non-nil currentSegment.tokenCount")
}
if a.currentGlobalSegment.tokenCount.CountByNamespaceID == nil {
t.Errorf("expected non-nil currentSegment.tokenCount.CountByNamespaceID")
}
if a.currentLocalSegment.currentClients == nil {
t.Fatalf("expected non-nil currentSegment.currentClients")
}
if a.currentLocalSegment.currentClients.Clients == nil {
t.Errorf("expected non-nil currentSegment.currentClients.Entities")
}
if a.currentLocalSegment.tokenCount == nil {
t.Fatalf("expected non-nil currentSegment.tokenCount")
}
if a.currentLocalSegment.tokenCount.CountByNamespaceID == nil {
t.Errorf("expected non-nil currentSegment.tokenCount.CountByNamespaceID")
}
if a.partialMonthLocalClientTracker == nil {
t.Errorf("expected non-nil partialMonthLocalClientTracker")
}
if a.globalPartialMonthClientTracker == nil {
t.Errorf("expected non-nil globalPartialMonthClientTracker")
}
if len(a.currentGlobalSegment.currentClients.Clients) > 0 {
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentGlobalSegment.currentClients)
}
if len(a.currentLocalSegment.currentClients.Clients) > 0 {
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentLocalSegment.currentClients)
}
if len(a.currentLocalSegment.tokenCount.CountByNamespaceID) > 0 {
t.Errorf("expected no token counts to be loaded. got: %v", a.currentLocalSegment.tokenCount.CountByNamespaceID)
}
if len(a.partialMonthLocalClientTracker) > 0 {
t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthLocalClientTracker)
}
if len(a.globalPartialMonthClientTracker) > 0 {
t.Errorf("expected no active entity segment to be loaded. got: %v", a.globalPartialMonthClientTracker)
}
// ExpectCurrentSegmentsRefreshed verifies that the current segment has been refreshed
// non-nil empty components and updated with the `expectedStart` timestamp
// Note: if `verifyTimeNotZero` is true, ignore `expectedStart` and just make sure the timestamp isn't 0
func (a *ActivityLog) ExpectCurrentSegmentsRefreshed(t *testing.T, expectedStart int64, verifyTimeNotZero bool) {
t.Helper()
a.l.RLock()
defer a.l.RUnlock()
a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock()
require.NotNil(t, a.currentGlobalSegment.currentClients)
require.NotNil(t, a.currentGlobalSegment.currentClients.Clients)
require.NotNil(t, a.currentGlobalSegment.tokenCount)
require.NotNil(t, a.currentGlobalSegment.tokenCount.CountByNamespaceID)
require.NotNil(t, a.currentLocalSegment.currentClients)
require.NotNil(t, a.currentLocalSegment.currentClients.Clients)
require.NotNil(t, a.currentLocalSegment.tokenCount)
require.NotNil(t, a.currentLocalSegment.tokenCount.CountByNamespaceID)
require.NotNil(t, a.partialMonthLocalClientTracker)
require.NotNil(t, a.globalPartialMonthClientTracker)
require.Equal(t, 0, len(a.currentGlobalSegment.currentClients.Clients))
require.Equal(t, 0, len(a.currentLocalSegment.currentClients.Clients))
require.Equal(t, 0, len(a.currentLocalSegment.tokenCount.CountByNamespaceID))
require.Equal(t, 0, len(a.partialMonthLocalClientTracker))
require.Equal(t, 0, len(a.globalPartialMonthClientTracker))
if verifyTimeNotZero {
if a.currentGlobalSegment.startTimestamp == 0 {
t.Error("bad start timestamp. expected no reset but timestamp was reset")
require.NotEqual(t, 0, a.currentGlobalSegment.startTimestamp)
require.NotEqual(t, 0, a.currentLocalSegment.startTimestamp)
require.NotEqual(t, 0, a.currentSegment.startTimestamp)
} else {
require.Equal(t, expectedStart, a.currentGlobalSegment.startTimestamp)
require.Equal(t, expectedStart, a.currentLocalSegment.startTimestamp)
}
if a.currentLocalSegment.startTimestamp == 0 {
t.Error("bad start timestamp. expected no reset but timestamp was reset")
}
} else if a.currentGlobalSegment.startTimestamp != expectedStart {
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentGlobalSegment.startTimestamp)
} else if a.currentLocalSegment.startTimestamp != expectedStart {
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentLocalSegment.startTimestamp)
// EntityRecordsEqual compares the parts we care about from two activity entity record slices
// note: this makes a copy of the []*activity.EntityRecord so that misordered slices won't fail the comparison,
// but the function won't modify the order of the slices to compare
func EntityRecordsEqual(t *testing.T, record1, record2 []*activity.EntityRecord) bool {
t.Helper()
if record1 == nil {
return record2 == nil
}
if record2 == nil {
return record1 == nil
}
if len(record1) != len(record2) {
return false
}
// sort first on namespace, then on ID, then on timestamp
entityLessFn := func(e []*activity.EntityRecord, i, j int) bool {
ei := e[i]
ej := e[j]
nsComp := strings.Compare(ei.NamespaceID, ej.NamespaceID)
if nsComp == -1 {
return true
}
if nsComp == 1 {
return false
}
idComp := strings.Compare(ei.ClientID, ej.ClientID)
if idComp == -1 {
return true
}
if idComp == 1 {
return false
}
return ei.Timestamp < ej.Timestamp
}
entitiesCopy1 := make([]*activity.EntityRecord, len(record1))
entitiesCopy2 := make([]*activity.EntityRecord, len(record2))
copy(entitiesCopy1, record1)
copy(entitiesCopy2, record2)
sort.Slice(entitiesCopy1, func(i, j int) bool {
return entityLessFn(entitiesCopy1, i, j)
})
sort.Slice(entitiesCopy2, func(i, j int) bool {
return entityLessFn(entitiesCopy2, i, j)
})
for i, a := range entitiesCopy1 {
b := entitiesCopy2[i]
if a.ClientID != b.ClientID || a.NamespaceID != b.NamespaceID || a.Timestamp != b.Timestamp {
return false
}
}
return true
}
// ActiveEntitiesEqual checks that only the set of `test` exists in `active`
@@ -284,6 +349,7 @@ func (a *ActivityLog) SetStartTimestamp(timestamp int64) {
defer a.l.Unlock()
a.currentGlobalSegment.startTimestamp = timestamp
a.currentLocalSegment.startTimestamp = timestamp
a.currentSegment.startTimestamp = timestamp
}
// GetStoredTokenCountByNamespaceID returns the count of tokens by namespace ID
@@ -370,3 +436,38 @@ func (c *Core) DeleteLogsAtPath(ctx context.Context, t *testing.T, storagePath s
}
}
}
// SaveEntitySegment is a test helper function to keep the savePreviousEntitySegments function internal
func (a *ActivityLog) SaveEntitySegment(ctx context.Context, startTime int64, pathPrefix string, fragments []*activity.LogFragment) error {
return a.savePreviousEntitySegments(ctx, startTime, pathPrefix, fragments)
}
// LaunchMigrationWorker is a test only helper function that launches the migration workers.
// This allows us to keep the migration worker methods internal
func (a *ActivityLog) LaunchMigrationWorker(ctx context.Context, isSecondary bool) {
if isSecondary {
go a.core.secondaryDuplicateClientMigrationWorker(ctx)
} else {
go a.core.primaryDuplicateClientMigrationWorker(ctx)
}
}
// DedupUpgradeComplete is a test helper function that indicates whether the
// all correct states have been set after completing upgrade processes to 1.19+
func (a *ActivityLog) DedupUpgradeComplete(ctx context.Context) bool {
return a.hasDedupClientsUpgrade(ctx)
}
// ResetDedupUpgrade is a test helper function that resets the state to reflect
// how the system should look before running/completing any upgrade process to 1.19+
func (a *ActivityLog) ResetDedupUpgrade(ctx context.Context) {
a.view.Delete(ctx, activityDeduplicationUpgradeKey)
a.view.Delete(ctx, activitySecondaryDataRecCount)
}
// RefreshActivityLog is a test helper functions that refreshes the activity logs
// segments and current month data. This allows us to keep the refreshFromStoredLog
// function internal
func (a *ActivityLog) RefreshActivityLog(ctx context.Context) {
a.refreshFromStoredLog(ctx, &sync.WaitGroup{}, time.Now().UTC())
}

View File

@@ -7,9 +7,21 @@ package vault
import (
"context"
"github.com/hashicorp/vault/vault/activity"
)
// sendCurrentFragment is a no-op on OSS
func (a *ActivityLog) sendCurrentFragment(ctx context.Context) error {
return nil
}
// receiveSecondaryPreviousMonthGlobalData is a no-op on OSS
func (a *ActivityLog) receiveSecondaryPreviousMonthGlobalData(ctx context.Context, month int64, clients *activity.LogFragment) error {
return nil
}
// sendPreviousMonthGlobalClientsWorker is a no-op on OSS
func (a *ActivityLog) sendPreviousMonthGlobalClientsWorker(ctx context.Context) (map[int64][]*activity.EntityRecord, error) {
return map[int64][]*activity.EntityRecord{}, nil
}

View File

@@ -10,6 +10,7 @@ import (
"io"
"slices"
"sort"
"strconv"
"strings"
"time"
@@ -565,32 +566,25 @@ func (a *ActivityLog) extractLocalGlobalClientsDeprecatedStoragePath(ctx context
return clusterLocalClients, clusterGlobalClients, fmt.Errorf("could not list available logs on the cluster")
}
for _, time := range times {
entityPath := activityEntityBasePath + fmt.Sprint(time.Unix()) + "/"
segmentPaths, err := a.view.List(ctx, entityPath)
segments, err := a.getAllEntitySegmentsForMonth(ctx, activityEntityBasePath, time.Unix())
if err != nil {
return nil, nil, err
}
for _, seqNumber := range segmentPaths {
segment, err := a.readEntitySegmentAtPath(ctx, entityPath+seqNumber)
if segment == nil {
continue
}
if err != nil {
a.logger.Warn("failed to read segment", "error", err)
return clusterLocalClients, clusterGlobalClients, err
}
for _, segment := range segments {
for _, entity := range segment.GetClients() {
// If the client is not local, then add it to a map
// Normalize month value to the beginning of the month to avoid multiple storage entries for the same month
startOfMonth := timeutil.StartOfMonth(time.UTC())
if local, _ := a.isClientLocal(entity); !local {
if _, ok := clusterGlobalClients[time.Unix()]; !ok {
clusterGlobalClients[time.Unix()] = make([]*activity.EntityRecord, 0)
if _, ok := clusterGlobalClients[startOfMonth.Unix()]; !ok {
clusterGlobalClients[startOfMonth.Unix()] = make([]*activity.EntityRecord, 0)
}
clusterGlobalClients[time.Unix()] = append(clusterGlobalClients[time.Unix()], entity)
clusterGlobalClients[startOfMonth.Unix()] = append(clusterGlobalClients[startOfMonth.Unix()], entity)
} else {
if _, ok := clusterLocalClients[time.Unix()]; !ok {
clusterLocalClients[time.Unix()] = make([]*activity.EntityRecord, 0)
if _, ok := clusterLocalClients[startOfMonth.Unix()]; !ok {
clusterLocalClients[startOfMonth.Unix()] = make([]*activity.EntityRecord, 0)
}
clusterLocalClients[time.Unix()] = append(clusterLocalClients[time.Unix()], entity)
clusterLocalClients[startOfMonth.Unix()] = append(clusterLocalClients[startOfMonth.Unix()], entity)
}
}
}
@@ -627,6 +621,25 @@ func (a *ActivityLog) extractTokensDeprecatedStoragePath(ctx context.Context) (m
return tokensByMonth, nil
}
func (a *ActivityLog) getAllEntitySegmentsForMonth(ctx context.Context, path string, time int64) ([]*activity.EntityActivityLog, error) {
entityPathWithTime := fmt.Sprintf("%s%d/", path, time)
segments := make([]*activity.EntityActivityLog, 0)
segmentPaths, err := a.view.List(ctx, entityPathWithTime)
if err != nil {
return segments, err
}
for _, seqNum := range segmentPaths {
segment, err := a.readEntitySegmentAtPath(ctx, entityPathWithTime+seqNum)
if err != nil {
return segments, err
}
if segment != nil {
segments = append(segments, segment)
}
}
return segments, nil
}
// OldestVersionHasDeduplicatedClients returns whether this cluster is 1.19+, and
// hence supports deduplicated clients
func (a *ActivityLog) OldestVersionHasDeduplicatedClients(ctx context.Context) bool {
@@ -648,3 +661,25 @@ func (a *ActivityLog) OldestVersionHasDeduplicatedClients(ctx context.Context) b
}
return oldestVersionIsDedupClients
}
func (a *ActivityLog) loadClientDataIntoSegment(ctx context.Context, pathPrefix string, startTime time.Time, seqNum uint64, currentSegment *segmentInfo) ([]*activity.EntityRecord, error) {
path := pathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(seqNum, 10)
out, err := a.readEntitySegmentAtPath(ctx, path)
if err != nil && !errors.Is(err, ErrEmptyResponse) {
return nil, err
}
if out != nil {
if !a.core.perfStandby {
a.logger.Debug(fmt.Sprintf("loading client data from %s into segment", path))
currentSegment.startTimestamp = startTime.Unix()
currentSegment.currentClients = &activity.EntityActivityLog{Clients: out.Clients}
currentSegment.clientSequenceNumber = seqNum
} else {
// populate this for edge case checking (if end of month passes while background loading on standby)
currentSegment.startTimestamp = startTime.Unix()
}
return out.GetClients(), nil
}
return []*activity.EntityRecord{}, nil
}

View File

@@ -991,6 +991,22 @@ func Test_ActivityLog_ComputeCurrentMonth_NamespaceMounts(t *testing.T) {
}
}
// writeOldEntityPathSegment writes a single segment to the old storage path with the given time and index for an entity
func writeOldEntityPathSegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) {
t.Helper()
protoItem, err := proto.Marshal(item)
require.NoError(t, err)
WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, ts, index), protoItem)
}
// writeSecondaryClusterSegment writes a single secondary global segment file with the given time and index for an entity
func writeSecondaryClusterSegment(t *testing.T, core *Core, ts time.Time, index int, clusterId string, item *activity.EntityActivityLog) {
t.Helper()
protoItem, err := proto.Marshal(item)
require.NoError(t, err)
WriteToStorage(t, core, makeSegmentPath(t, fmt.Sprintf("%s%s/%s", activitySecondaryTempDataPathPrefix, clusterId, activityEntityBasePath), ts, index), protoItem)
}
// writeGlobalEntitySegment writes a single global segment file with the given time and index for an entity
func writeGlobalEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) {
t.Helper()