Revert "Migrate Clients From Old Storage Paths to New Paths During Upgrade #7…" (#29253)

This reverts commit 9ba62bec6f.
This commit is contained in:
divyaac
2024-12-20 16:06:41 -08:00
committed by GitHub
parent 30e853da9d
commit 88f0710e26
5 changed files with 122 additions and 896 deletions

View File

@@ -46,9 +46,7 @@ const (
activityGlobalPathPrefix = "global/" activityGlobalPathPrefix = "global/"
activityLocalPathPrefix = "local/" activityLocalPathPrefix = "local/"
activityACMERegenerationKey = "acme-regeneration" activityACMERegenerationKey = "acme-regeneration"
activityDeduplicationUpgradeKey = "deduplication-upgrade"
// sketch for each month that stores hash of client ids // sketch for each month that stores hash of client ids
distinctClientsBasePath = "log/distinctclients/" distinctClientsBasePath = "log/distinctclients/"
@@ -116,8 +114,6 @@ const (
// CSV encoder. Indexes will be generated to ensure that values are slotted into the // CSV encoder. Indexes will be generated to ensure that values are slotted into the
// correct column. This initial value is used prior to finalizing the CSV header. // correct column. This initial value is used prior to finalizing the CSV header.
exportCSVFlatteningInitIndex = -1 exportCSVFlatteningInitIndex = -1
DeduplicatedClientMinimumVersion = "1.19.0"
) )
var ( var (
@@ -200,11 +196,6 @@ type ActivityLog struct {
// Channel to stop background processing // Channel to stop background processing
doneCh chan struct{} doneCh chan struct{}
// Channel to signal global clients have received by the primary from the secondary, during upgrade to 1.19
dedupUpgradeGlobalClientsReceivedCh chan struct{}
// track whether the current cluster is in the middle of an upgrade to 1.19
dedupClientsUpgradeComplete *atomic.Bool
// track metadata and contents of the most recent log segment // track metadata and contents of the most recent log segment
currentSegment segmentInfo currentSegment segmentInfo
@@ -233,18 +224,8 @@ type ActivityLog struct {
// channel closed when deletion at startup is done // channel closed when deletion at startup is done
// (for unit test robustness) // (for unit test robustness)
retentionDone chan struct{} retentionDone chan struct{}
// This channel is relevant for upgrades to 1.17. It indicates whether precomputed queries have been
// generated for ACME clients.
computationWorkerDone chan struct{} computationWorkerDone chan struct{}
// This channel is relevant for upgrades to 1.19+ (version with deduplication of clients)
// This indicates that paths that were used before 1.19 to store clients have been cleaned
oldStoragePathsCleaned chan struct{}
// channel to indicate that a global clients have been
// sent to the primary from a secondary
globalClientsSent chan struct{}
clientsReceivedForMigration map[int64][]*activity.LogFragment
// for testing: is config currently being invalidated. protected by l // for testing: is config currently being invalidated. protected by l
configInvalidationInProgress bool configInvalidationInProgress bool
@@ -369,21 +350,19 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
clock = timeutil.DefaultClock{} clock = timeutil.DefaultClock{}
} }
a := &ActivityLog{ a := &ActivityLog{
core: core, core: core,
configOverrides: &core.activityLogConfig, configOverrides: &core.activityLogConfig,
logger: logger, logger: logger,
view: view, view: view,
metrics: metrics, metrics: metrics,
nodeID: hostname, nodeID: hostname,
newFragmentCh: make(chan struct{}, 1), newFragmentCh: make(chan struct{}, 1),
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
doneCh: make(chan struct{}, 1), doneCh: make(chan struct{}, 1),
partialMonthLocalClientTracker: make(map[string]*activity.EntityRecord), partialMonthLocalClientTracker: make(map[string]*activity.EntityRecord),
newGlobalClientFragmentCh: make(chan struct{}, 1), newGlobalClientFragmentCh: make(chan struct{}, 1),
dedupUpgradeGlobalClientsReceivedCh: make(chan struct{}, 1), globalPartialMonthClientTracker: make(map[string]*activity.EntityRecord),
clientsReceivedForMigration: make(map[int64][]*activity.LogFragment), clock: clock,
globalPartialMonthClientTracker: make(map[string]*activity.EntityRecord),
clock: clock,
currentSegment: segmentInfo{ currentSegment: segmentInfo{
startTimestamp: 0, startTimestamp: 0,
currentClients: &activity.EntityActivityLog{ currentClients: &activity.EntityActivityLog{
@@ -428,7 +407,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
secondaryGlobalClientFragments: make([]*activity.LogFragment, 0), secondaryGlobalClientFragments: make([]*activity.LogFragment, 0),
inprocessExport: atomic.NewBool(false), inprocessExport: atomic.NewBool(false),
precomputedQueryWritten: make(chan struct{}), precomputedQueryWritten: make(chan struct{}),
dedupClientsUpgradeComplete: atomic.NewBool(false),
} }
config, err := a.loadConfigOrDefault(core.activeContext) config, err := a.loadConfigOrDefault(core.activeContext)
@@ -481,8 +459,6 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
a.currentGlobalFragment = nil a.currentGlobalFragment = nil
a.globalFragmentLock.Unlock() a.globalFragmentLock.Unlock()
globalFragments := append(append(secondaryGlobalClients, globalClients), standbyGlobalClients...)
if !a.core.IsPerfSecondary() { if !a.core.IsPerfSecondary() {
if a.currentGlobalFragment != nil { if a.currentGlobalFragment != nil {
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_fragment_size"}, a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_fragment_size"},
@@ -491,24 +467,19 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
{"type", "client"}, {"type", "client"},
}) })
} }
var globalReceivedFragmentTotal int
for _, globalReceivedFragment := range secondaryGlobalClients {
globalReceivedFragmentTotal += len(globalReceivedFragment.Clients)
}
for _, globalReceivedFragment := range standbyGlobalClients {
globalReceivedFragmentTotal += len(globalReceivedFragment.Clients)
}
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_received_fragment_size"}, a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_received_fragment_size"},
float32(len(globalFragments)), float32(globalReceivedFragmentTotal),
[]metricsutil.Label{ []metricsutil.Label{
{"type", "client"}, {"type", "client"},
}) })
// Since we are the primary, store global clients
// Create fragments from global clients and store the segment
if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil {
return ret
}
} else if !a.dedupClientsUpgradeComplete.Load() {
// We are the secondary, and an upgrade is in progress. In this case we will temporarily store the data at this old path
// This data will be garbage collected after the upgrade has completed
if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentSegment, force, ""); ret != nil {
return ret
}
} }
// If segment start time is zero, do not update or write // If segment start time is zero, do not update or write
@@ -518,6 +489,15 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
return nil return nil
} }
// If we are the primary, store global clients
// Create fragments from global clients and store the segment
if !a.core.IsPerfSecondary() {
globalFragments := append(append(secondaryGlobalClients, globalClients), standbyGlobalClients...)
if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil {
return ret
}
}
// Swap out the pending local fragments // Swap out the pending local fragments
a.localFragmentLock.Lock() a.localFragmentLock.Lock()
localFragment := a.localFragment localFragment := a.localFragment
@@ -635,74 +615,6 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra
return nil return nil
} }
func (a *ActivityLog) savePreviousTokenSegments(ctx context.Context, startTime int64, pathPrefix string, fragments []*activity.LogFragment) error {
tokenByNamespace := make(map[string]uint64)
for _, fragment := range fragments {
// As of 1.9, a fragment should no longer have any NonEntityTokens. However
// in order to not lose any information about the current segment during the
// month when the client upgrades to 1.9, we must retain this functionality.
for ns, val := range fragment.NonEntityTokens {
// We track these pre-1.9 values in the old location, which is
// a.currentSegment.tokenCount, as opposed to the counter that stores tokens
// without entities that have client IDs, namely
// a.partialMonthClientTracker.nonEntityCountByNamespaceID. This preserves backward
// compatibility for the precomputedQueryWorkers and the segment storing
// logic.
tokenByNamespace[ns] += val
}
}
segmentToStore := segmentInfo{
startTimestamp: startTime,
clientSequenceNumber: 0,
currentClients: &activity.EntityActivityLog{
Clients: make([]*activity.EntityRecord, 0),
},
tokenCount: &activity.TokenCount{CountByNamespaceID: tokenByNamespace},
}
if _, err := a.saveSegmentEntitiesInternal(ctx, segmentToStore, false, pathPrefix); err != nil {
return err
}
return nil
}
func (a *ActivityLog) savePreviousEntitySegments(ctx context.Context, startTime int64, pathPrefix string, allFragments []*activity.LogFragment) error {
deduplicatedClients := make(map[string]*activity.EntityRecord)
for _, f := range allFragments {
for _, entity := range f.GetClients() {
deduplicatedClients[entity.ClientID] = entity
}
}
segmentToStore := segmentInfo{
startTimestamp: startTime,
clientSequenceNumber: 0,
currentClients: &activity.EntityActivityLog{
Clients: make([]*activity.EntityRecord, 0),
},
}
incrementSegmentNum := func() {
segmentToStore.clientSequenceNumber = segmentToStore.clientSequenceNumber + 1
segmentToStore.currentClients.Clients = make([]*activity.EntityRecord, 0)
}
numAddedClients := 0
for _, entity := range deduplicatedClients {
segmentToStore.currentClients.Clients = append(segmentToStore.currentClients.Clients, entity)
numAddedClients++
if numAddedClients%ActivitySegmentClientCapacity == 0 {
if _, err := a.saveSegmentEntitiesInternal(ctx, segmentToStore, false, pathPrefix); err != nil {
return err
}
incrementSegmentNum()
}
}
// Store any remaining clients if they exist
if _, err := a.saveSegmentEntitiesInternal(ctx, segmentToStore, false, pathPrefix); err != nil {
return err
}
return nil
}
// :force: forces a save of tokens/entities even if the in-memory log is empty // :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool, currentSegment segmentInfo, storagePathPrefix string) error { func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool, currentSegment segmentInfo, storagePathPrefix string) error {
_, err := a.saveSegmentEntitiesInternal(ctx, currentSegment, force, storagePathPrefix) _, err := a.saveSegmentEntitiesInternal(ctx, currentSegment, force, storagePathPrefix)
@@ -799,30 +711,28 @@ func parseSegmentNumberFromPath(path string) (int, bool) {
// availableLogs returns the start_time(s) (in UTC) associated with months for which logs exist, // availableLogs returns the start_time(s) (in UTC) associated with months for which logs exist,
// sorted last to first // sorted last to first
func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) { func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) {
paths := make([]string, 0)
for _, basePath := range []string{activityLocalPathPrefix + activityEntityBasePath, activityGlobalPathPrefix + activityEntityBasePath, activityTokenLocalBasePath} {
p, err := a.view.List(ctx, basePath)
if err != nil {
return nil, err
}
paths = append(paths, p...)
}
pathSet := make(map[time.Time]struct{}) pathSet := make(map[time.Time]struct{})
out := make([]time.Time, 0) out := make([]time.Time, 0)
availableTimes := make([]time.Time, 0) for _, path := range paths {
// generate a set of unique start times
segmentTime, err := timeutil.ParseTimeFromPath(path)
if err != nil {
return nil, err
}
if segmentTime.After(upTo) {
continue
}
times, err := a.availableTimesAtPath(ctx, upTo, activityTokenLocalBasePath)
if err != nil {
return nil, err
}
availableTimes = append(availableTimes, times...)
times, err = a.availableTimesAtPath(ctx, upTo, activityGlobalPathPrefix+activityEntityBasePath)
if err != nil {
return nil, err
}
availableTimes = append(availableTimes, times...)
times, err = a.availableTimesAtPath(ctx, upTo, activityLocalPathPrefix+activityEntityBasePath)
if err != nil {
return nil, err
}
availableTimes = append(availableTimes, times...)
// Remove duplicate start times
for _, segmentTime := range availableTimes {
if _, present := pathSet[segmentTime]; !present { if _, present := pathSet[segmentTime]; !present {
pathSet[segmentTime] = struct{}{} pathSet[segmentTime] = struct{}{}
out = append(out, segmentTime) out = append(out, segmentTime)
@@ -839,27 +749,6 @@ func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time
return out, nil return out, nil
} }
// availableTimesAtPath returns a sorted list of all available times at the pathPrefix up until the provided time.
func (a *ActivityLog) availableTimesAtPath(ctx context.Context, onlyIncludeTimesUpTo time.Time, path string) ([]time.Time, error) {
paths, err := a.view.List(ctx, path)
if err != nil {
return nil, err
}
out := make([]time.Time, 0)
for _, path := range paths {
// generate a set of unique start times
segmentTime, err := timeutil.ParseTimeFromPath(path)
if err != nil {
return nil, err
}
if segmentTime.After(onlyIncludeTimesUpTo) {
continue
}
out = append(out, segmentTime)
}
return out, nil
}
// getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent // getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent
// contiguous set of activity logs, sorted in decreasing order (latest to earliest) // contiguous set of activity logs, sorted in decreasing order (latest to earliest)
func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context, now time.Time) ([]time.Time, error) { func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context, now time.Time) ([]time.Time, error) {
@@ -988,42 +877,54 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
// load all the active global clients // load all the active global clients
if !isLocal { if !isLocal {
globalPath := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) globalPath := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
out, err := a.readEntitySegmentAtPath(ctx, globalPath) data, err := a.view.Get(ctx, globalPath)
if err != nil && !errors.Is(err, ErrEmptyResponse) { if err != nil {
return err return err
} }
if out != nil { if data == nil {
a.globalFragmentLock.Lock() return nil
// Handle the (unlikely) case where the end of the month has been reached while background loading.
// Or the feature has been disabled.
if a.enabled && startTime.Unix() == a.currentGlobalSegment.startTimestamp {
for _, ent := range out.Clients {
a.globalPartialMonthClientTracker[ent.ClientID] = ent
}
}
a.globalFragmentLock.Unlock()
} }
out := &activity.EntityActivityLog{}
} else { err = proto.Unmarshal(data.Value, out)
// load all the active local clients if err != nil {
localPath := activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
out, err := a.readEntitySegmentAtPath(ctx, localPath)
if err != nil && !errors.Is(err, ErrEmptyResponse) {
return err return err
} }
if out != nil { a.globalFragmentLock.Lock()
a.localFragmentLock.Lock() // Handle the (unlikely) case where the end of the month has been reached while background loading.
// Handle the (unlikely) case where the end of the month has been reached while background loading. // Or the feature has been disabled.
// Or the feature has been disabled. if a.enabled && startTime.Unix() == a.currentGlobalSegment.startTimestamp {
if a.enabled && startTime.Unix() == a.currentLocalSegment.startTimestamp { for _, ent := range out.Clients {
for _, ent := range out.Clients { a.globalPartialMonthClientTracker[ent.ClientID] = ent
a.partialMonthLocalClientTracker[ent.ClientID] = ent
}
} }
a.localFragmentLock.Unlock()
} }
a.globalFragmentLock.Unlock()
return nil
} }
// load all the active local clients
localPath := activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
data, err := a.view.Get(ctx, localPath)
if err != nil {
return err
}
if data == nil {
return nil
}
out := &activity.EntityActivityLog{}
err = proto.Unmarshal(data.Value, out)
if err != nil {
return err
}
a.localFragmentLock.Lock()
// Handle the (unlikely) case where the end of the month has been reached while background loading.
// Or the feature has been disabled.
if a.enabled && startTime.Unix() == a.currentLocalSegment.startTimestamp {
for _, ent := range out.Clients {
a.partialMonthLocalClientTracker[ent.ClientID] = ent
}
}
a.localFragmentLock.Unlock()
return nil return nil
} }
@@ -1031,17 +932,23 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
// into memory (to append new entries), and to the globalPartialMonthClientTracker and partialMonthLocalClientTracker to // into memory (to append new entries), and to the globalPartialMonthClientTracker and partialMonthLocalClientTracker to
// avoid duplication call with fragmentLock, globalFragmentLock, localFragmentLock and l held. // avoid duplication call with fragmentLock, globalFragmentLock, localFragmentLock and l held.
func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, localSegmentSequenceNumber uint64, globalSegmentSequenceNumber uint64) error { func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, localSegmentSequenceNumber uint64, globalSegmentSequenceNumber uint64) error {
// setting a.currentSegment timestamp to support upgrades
a.currentSegment.startTimestamp = startTime.Unix()
// load current global segment // load current global segment
path := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10) path := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10)
out, err := a.readEntitySegmentAtPath(ctx, path) // setting a.currentSegment timestamp to support upgrades
if err != nil && !errors.Is(err, ErrEmptyResponse) { a.currentSegment.startTimestamp = startTime.Unix()
data, err := a.view.Get(ctx, path)
if err != nil {
return err return err
} }
if out != nil { if data != nil {
out := &activity.EntityActivityLog{}
err = proto.Unmarshal(data.Value, out)
if err != nil {
return err
}
if !a.core.perfStandby { if !a.core.perfStandby {
a.currentGlobalSegment = segmentInfo{ a.currentGlobalSegment = segmentInfo{
startTimestamp: startTime.Unix(), startTimestamp: startTime.Unix(),
@@ -1064,11 +971,17 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti
// load current local segment // load current local segment
path = activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(localSegmentSequenceNumber, 10) path = activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(localSegmentSequenceNumber, 10)
out, err = a.readEntitySegmentAtPath(ctx, path) data, err = a.view.Get(ctx, path)
if err != nil && !errors.Is(err, ErrEmptyResponse) { if err != nil {
return err return err
} }
if out != nil { if data != nil {
out := &activity.EntityActivityLog{}
err = proto.Unmarshal(data.Value, out)
if err != nil {
return err
}
if !a.core.perfStandby { if !a.core.perfStandby {
a.currentLocalSegment = segmentInfo{ a.currentLocalSegment = segmentInfo{
startTimestamp: startTime.Unix(), startTimestamp: startTime.Unix(),
@@ -1085,43 +998,12 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti
for _, client := range out.Clients { for _, client := range out.Clients {
a.partialMonthLocalClientTracker[client.ClientID] = client a.partialMonthLocalClientTracker[client.ClientID] = client
} }
} }
return nil return nil
} }
func (a *ActivityLog) readEntitySegmentAtPath(ctx context.Context, path string) (*activity.EntityActivityLog, error) {
data, err := a.view.Get(ctx, path)
if err != nil {
return nil, err
}
if data == nil {
return nil, ErrEmptyResponse
}
out := &activity.EntityActivityLog{}
err = proto.Unmarshal(data.Value, out)
if err != nil {
return nil, err
}
return out, nil
}
func (a *ActivityLog) readTokenSegmentAtPath(ctx context.Context, path string) (*activity.TokenCount, error) {
data, err := a.view.Get(ctx, path)
if err != nil {
return nil, err
}
if data == nil {
return nil, ErrEmptyResponse
}
out := &activity.TokenCount{}
err = proto.Unmarshal(data.Value, out)
if err != nil {
return nil, err
}
return out, nil
}
// tokenCountExists checks if there's a token log for :startTime: // tokenCountExists checks if there's a token log for :startTime:
// this function should be called with the lock held // this function should be called with the lock held
func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) (bool, error) { func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) (bool, error) {
@@ -1288,26 +1170,6 @@ func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64,
close(whenDone) close(whenDone)
} }
func (a *ActivityLog) deleteOldStoragePathWorker(ctx context.Context, pathPrefix string) {
pathTimes, err := a.view.List(ctx, pathPrefix)
if err != nil {
a.logger.Error("could not list segment paths", "error", err)
return
}
for _, pathTime := range pathTimes {
segments, err := a.view.List(ctx, pathPrefix+pathTime)
if err != nil {
a.logger.Error("could not list segment path", "error", err)
}
for _, seqNum := range segments {
err = a.view.Delete(ctx, pathPrefix+pathTime+seqNum)
if err != nil {
a.logger.Error("could not delete log", "error", err)
}
}
}
}
func (a *ActivityLog) WaitForDeletion() { func (a *ActivityLog) WaitForDeletion() {
a.l.Lock() a.l.Lock()
// May be nil, if never set // May be nil, if never set
@@ -1646,78 +1508,11 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup, r
manager.retentionWorker(ctx, manager.clock.Now(), months) manager.retentionWorker(ctx, manager.clock.Now(), months)
close(manager.retentionDone) close(manager.retentionDone)
}(manager.retentionMonths) }(manager.retentionMonths)
// We do not want to hold up unseal, and we need access to
// the replicationRpcClient in order for the secondary to migrate data.
// This client is only reliable preset after unseal.
c.postUnsealFuncs = append(c.postUnsealFuncs, func() {
c.activityLogMigrationTask(ctx)
})
} }
return nil return nil
} }
// secondaryDuplicateClientMigrationWorker will attempt to send global data living on the
// current cluster to the primary cluster. This routine will only exit when its connected primary
// has reached version 1.19+, and this cluster has completed sending any global data that lives at the old storage paths
func (c *Core) secondaryDuplicateClientMigrationWorker(ctx context.Context) {
manager := c.activityLog
manager.logger.Trace("started secondary activity log migration worker")
storageMigrationComplete := atomic.NewBool(false)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
if !c.IsPerfSecondary() {
// TODO: Create function for the secondary to continuously attempt to send data to the primary
}
wg.Done()
}()
wg.Add(1)
go func() {
localClients, _, err := manager.extractLocalGlobalClientsDeprecatedStoragePath(ctx)
if err != nil {
return
}
// Store local clients at new path
for month, entitiesForMonth := range localClients {
logFragments := []*activity.LogFragment{{
Clients: entitiesForMonth,
}}
if err = manager.savePreviousEntitySegments(ctx, month, activityLocalPathPrefix, logFragments); err != nil {
manager.logger.Error("failed to write local segment", "error", err, "month", month)
return
}
}
storageMigrationComplete.Store(true)
// TODO: generate/store PCQs for these local clients
wg.Done()
}()
wg.Wait()
if !storageMigrationComplete.Load() {
manager.logger.Error("could not complete migration of duplicate clients on cluster")
return
}
// We have completed the vital portions of the storage migration
if err := manager.writeDedupClientsUpgrade(ctx); err != nil {
manager.logger.Error("could not complete migration of duplicate clients on cluster")
return
}
// Now that all the clients have been migrated and PCQs have been created, remove all clients at old storage paths
manager.oldStoragePathsCleaned = make(chan struct{})
go func() {
defer close(manager.oldStoragePathsCleaned)
manager.deleteOldStoragePathWorker(ctx, activityEntityBasePath)
manager.deleteOldStoragePathWorker(ctx, activityTokenBasePath)
// TODO: Delete old PCQs
}()
manager.dedupClientsUpgradeComplete.Store(true)
manager.logger.Trace("completed secondary activity log migration worker")
}
func (a *ActivityLog) hasRegeneratedACME(ctx context.Context) bool { func (a *ActivityLog) hasRegeneratedACME(ctx context.Context) bool {
regenerated, err := a.view.Get(ctx, activityACMERegenerationKey) regenerated, err := a.view.Get(ctx, activityACMERegenerationKey)
if err != nil { if err != nil {
@@ -1727,15 +1522,6 @@ func (a *ActivityLog) hasRegeneratedACME(ctx context.Context) bool {
return regenerated != nil return regenerated != nil
} }
func (a *ActivityLog) hasDedupClientsUpgrade(ctx context.Context) bool {
regenerated, err := a.view.Get(ctx, activityDeduplicationUpgradeKey)
if err != nil {
a.logger.Error("unable to access deduplication regeneration key")
return false
}
return regenerated != nil
}
func (a *ActivityLog) writeRegeneratedACME(ctx context.Context) error { func (a *ActivityLog) writeRegeneratedACME(ctx context.Context) error {
regeneratedEntry, err := logical.StorageEntryJSON(activityACMERegenerationKey, true) regeneratedEntry, err := logical.StorageEntryJSON(activityACMERegenerationKey, true)
if err != nil { if err != nil {
@@ -1744,14 +1530,6 @@ func (a *ActivityLog) writeRegeneratedACME(ctx context.Context) error {
return a.view.Put(ctx, regeneratedEntry) return a.view.Put(ctx, regeneratedEntry)
} }
func (a *ActivityLog) writeDedupClientsUpgrade(ctx context.Context) error {
regeneratedEntry, err := logical.StorageEntryJSON(activityDeduplicationUpgradeKey, true)
if err != nil {
return err
}
return a.view.Put(ctx, regeneratedEntry)
}
func (a *ActivityLog) regeneratePrecomputedQueries(ctx context.Context) error { func (a *ActivityLog) regeneratePrecomputedQueries(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@@ -1918,12 +1696,7 @@ func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) {
<-timer.C <-timer.C
} }
} }
// Only send data if no upgrade is in progress. Else, the active worker will sendFunc()
// store the data in a temporary location until it is garbage collected
if a.dedupClientsUpgradeComplete.Load() {
sendFunc()
}
case <-endOfMonth.C: case <-endOfMonth.C:
a.logger.Trace("sending global fragment on end of month") a.logger.Trace("sending global fragment on end of month")
// Flush the current fragment, if any // Flush the current fragment, if any
@@ -1933,16 +1706,13 @@ func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) {
<-timer.C <-timer.C
} }
} }
// If an upgrade is in progress, don't do anything sendFunc()
// The active fragmentWorker will take care of flushing the clients to a temporary location
if a.dedupClientsUpgradeComplete.Load() {
sendFunc()
// clear active entity set
a.globalFragmentLock.Lock()
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.globalFragmentLock.Unlock() // clear active entity set
} a.globalFragmentLock.Lock()
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.globalFragmentLock.Unlock()
// Set timer for next month. // Set timer for next month.
// The current segment *probably* hasn't been set yet (via invalidation), // The current segment *probably* hasn't been set yet (via invalidation),
@@ -4028,110 +3798,6 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
return nil return nil
} }
func (c *Core) activityLogMigrationTask(ctx context.Context) {
manager := c.activityLog
if !c.IsPerfSecondary() {
// If the oldest version is less than 1.19 and no migrations tasks have been run, kick off the migration task
if !manager.OldestVersionHasDeduplicatedClients(ctx) && !manager.hasDedupClientsUpgrade(ctx) {
go c.primaryDuplicateClientMigrationWorker(ctx)
} else {
// Store that upgrade processes have already been completed
manager.writeDedupClientsUpgrade(ctx)
manager.dedupClientsUpgradeComplete.Store(true)
}
} else {
// We kick off the secondary migration worker in any chance that the primary has not yet upgraded.
// If we have already completed the migration task, it indicates that the cluster has completed sending data to an
// already upgraded primary
if !manager.hasDedupClientsUpgrade(ctx) {
go c.secondaryDuplicateClientMigrationWorker(ctx)
} else {
// Store that upgrade processes have already been completed
manager.writeDedupClientsUpgrade(ctx)
manager.dedupClientsUpgradeComplete.Store(true)
}
}
}
// primaryDuplicateClientMigrationWorker will attempt to receive global data living on the
// connected secondary clusters. Once the data has been received, it will combine it with
// its own global data at old storage paths, and migrate all of it to new storage paths on the
// current cluster. This method wil only exit once all connected secondary clusters have
// upgraded to 1.19, and this cluster receives global data from all of them.
func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error {
a := c.activityLog
a.logger.Trace("started primary activity log migration worker")
// Collect global clients from secondary
err := a.waitForSecondaryGlobalClients(ctx)
if err != nil {
return err
}
// Get local and global entities from previous months
clusterLocalClients, clusterGlobalClients, err := a.extractLocalGlobalClientsDeprecatedStoragePath(ctx)
if err != nil {
a.logger.Error("could not extract local and global clients from storage", "error", err)
return err
}
// Get tokens from previous months at old storage paths
clusterTokens, err := a.extractTokensDeprecatedStoragePath(ctx)
// TODO: Collect clients from secondaries into slice of fragments
// Store global clients at new path
for month, entitiesForMonth := range clusterGlobalClients {
logFragments := []*activity.LogFragment{{
Clients: entitiesForMonth,
}}
if err = a.savePreviousEntitySegments(ctx, month, activityGlobalPathPrefix, logFragments); err != nil {
a.logger.Error("failed to write global segment", "error", err, "month", month)
return err
}
}
// Store local clients at new path
for month, entitiesForMonth := range clusterLocalClients {
logFragments := []*activity.LogFragment{{
Clients: entitiesForMonth,
}}
if err = a.savePreviousEntitySegments(ctx, month, activityLocalPathPrefix, logFragments); err != nil {
a.logger.Error("failed to write local segment", "error", err, "month", month)
return err
}
}
// Store tokens at new path
for month, tokenCount := range clusterTokens {
// Combine all token counts from all clusters
logFragments := make([]*activity.LogFragment, len(tokenCount))
for i, tokens := range tokenCount {
logFragments[i] = &activity.LogFragment{NonEntityTokens: tokens}
}
if err = a.savePreviousTokenSegments(ctx, month, activityLocalPathPrefix+activityTokenBasePath, logFragments); err != nil {
a.logger.Error("failed to write token segment", "error", err, "month", month)
return err
}
}
// TODO: After data has been migrated to new locations, we will regenerate all the global and local PCQs
if err := a.writeDedupClientsUpgrade(ctx); err != nil {
a.logger.Error("could not complete migration of duplicate clients on cluster")
return err
}
// Garbage collect data at old paths
a.oldStoragePathsCleaned = make(chan struct{})
go func() {
defer close(a.oldStoragePathsCleaned)
a.deleteOldStoragePathWorker(ctx, activityEntityBasePath)
a.deleteOldStoragePathWorker(ctx, activityTokenBasePath)
// We will also need to delete old PCQs
}()
a.dedupClientsUpgradeComplete.Store(true)
a.logger.Trace("completed primary activity log migration worker")
return nil
}
type encoder interface { type encoder interface {
Encode(*ActivityLogExportRecord) error Encode(*ActivityLogExportRecord) error
Flush() Flush()

View File

@@ -5,22 +5,11 @@
package vault package vault
import ( import "context"
"context"
"errors"
)
//go:generate go run github.com/hashicorp/vault/tools/stubmaker //go:generate go run github.com/hashicorp/vault/tools/stubmaker
// ErrEmptyResponse error is used to avoid returning "nil, nil" from a function
var ErrEmptyResponse = errors.New("empty response; the system encountered a statement that exclusively returns nil values")
// sendGlobalClients is a no-op on CE // sendGlobalClients is a no-op on CE
func (a *ActivityLog) sendGlobalClients(ctx context.Context) error { func (a *ActivityLog) sendGlobalClients(ctx context.Context) error {
return nil return nil
} }
// waitForSecondaryGlobalClients is a no-op on CE
func (a *ActivityLog) waitForSecondaryGlobalClients(ctx context.Context) error {
return nil
}

View File

@@ -5821,280 +5821,3 @@ func TestCreateSegment_StoreSegment(t *testing.T) {
}) })
} }
} }
// TestActivityLog_PrimaryDuplicateClientMigrationWorker verifies that the primary
// migration worker correctly moves data from old location to the new location
func TestActivityLog_PrimaryDuplicateClientMigrationWorker(t *testing.T) {
cluster := NewTestCluster(t, nil, nil)
core := cluster.Cores[0].Core
a := core.activityLog
a.SetEnable(true)
ctx := context.Background()
timeStamp := time.Now()
startOfMonth := timeutil.StartOfMonth(timeStamp)
oneMonthAgo := timeutil.StartOfPreviousMonth(timeStamp)
twoMonthsAgo := timeutil.StartOfPreviousMonth(oneMonthAgo)
clientRecordsGlobal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1)
for i := range clientRecordsGlobal {
clientRecordsGlobal[i] = &activity.EntityRecord{
ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i),
Timestamp: timeStamp.Unix(),
NonEntity: false,
}
}
clientRecordsLocal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1)
for i := range clientRecordsGlobal {
clientRecordsLocal[i] = &activity.EntityRecord{
ClientID: fmt.Sprintf("011122222-3333-4444-5555-%012v", i),
Timestamp: timeStamp.Unix(),
// This is to trick the system into believing this a local client when parsing data
ClientType: nonEntityTokenActivityType,
}
}
tokenCounts := map[string]uint64{
"ns1": 10,
"ns2": 11,
"ns3": 12,
}
// Write global and local clients to old path
a.savePreviousEntitySegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal, clientRecordsGlobal...)}})
a.savePreviousEntitySegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[1:], clientRecordsGlobal[1:]...)}})
a.savePreviousEntitySegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[2:], clientRecordsGlobal[2:]...)}})
// Assert that the migration workers have not been run
require.True(t, a.hasDedupClientsUpgrade(ctx))
require.True(t, a.dedupClientsUpgradeComplete.Load())
// Resetting this to false so that we can
// verify that after the migrations is completed, the correct values have been stored
a.dedupClientsUpgradeComplete.Store(false)
require.NoError(t, a.view.Delete(ctx, activityDeduplicationUpgradeKey))
// Forcefully run the primary migration worker
core.primaryDuplicateClientMigrationWorker(ctx)
// Verify that we have the correct number of global clients at the new storage paths
times := []time.Time{twoMonthsAgo, oneMonthAgo, startOfMonth}
for index, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
globalClients := make([]*activity.EntityRecord, 0)
for {
segment, err := reader.ReadGlobalEntity(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
globalClients = append(globalClients, segment.GetClients()...)
}
require.Equal(t, len(clientRecordsGlobal)-index, len(globalClients))
}
// Verify local clients
for index, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
localClients := make([]*activity.EntityRecord, 0)
for {
segment, err := reader.ReadLocalEntity(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
localClients = append(localClients, segment.GetClients()...)
}
require.Equal(t, len(clientRecordsLocal)-index, len(localClients))
}
// Verify non-entity tokens have been correctly migrated
for _, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
for {
segment, err := reader.ReadToken(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
// Verify that the data is correct
deep.Equal(segment.GetCountByNamespaceID(), tokenCounts)
}
}
// Check that the storage key has been updated
require.True(t, a.hasDedupClientsUpgrade(ctx))
// Check that the bool has been updated
require.True(t, a.dedupClientsUpgradeComplete.Load())
// Wait for the deletion of old logs to complete
timeout := time.After(25 * time.Second)
// Wait for channel indicating deletion to be written
select {
case <-timeout:
t.Fatal("timed out waiting for deletion to complete")
case <-a.oldStoragePathsCleaned:
break
}
// Verify there is no data at the old paths
times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath)
require.NoError(t, err)
require.Equal(t, 0, len(times))
// Verify there is no data at the old token paths
times, err = a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath)
require.NoError(t, err)
require.Equal(t, 0, len(times))
}
// TestActivityLog_SecondaryDuplicateClientMigrationWorker verifies that the secondary
// migration worker correctly moves local data from old location to the new location
func TestActivityLog_SecondaryDuplicateClientMigrationWorker(t *testing.T) {
cluster := NewTestCluster(t, nil, nil)
core := cluster.Cores[0].Core
a := core.activityLog
a.SetEnable(true)
ctx := context.Background()
timeStamp := time.Now()
startOfMonth := timeutil.StartOfMonth(timeStamp)
oneMonthAgo := timeutil.StartOfPreviousMonth(timeStamp)
twoMonthsAgo := timeutil.StartOfPreviousMonth(oneMonthAgo)
clientRecordsGlobal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1)
for i := range clientRecordsGlobal {
clientRecordsGlobal[i] = &activity.EntityRecord{
ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i),
Timestamp: timeStamp.Unix(),
NonEntity: false,
}
}
clientRecordsLocal := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1)
for i := range clientRecordsGlobal {
clientRecordsLocal[i] = &activity.EntityRecord{
ClientID: fmt.Sprintf("011122222-3333-4444-5555-%012v", i),
Timestamp: timeStamp.Unix(),
// This is to trick the system into believing this a local client when parsing data
ClientType: nonEntityTokenActivityType,
}
}
tokenCounts := map[string]uint64{
"ns1": 10,
"ns2": 11,
"ns3": 12,
}
// Write global and local clients to old path
a.savePreviousEntitySegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal, clientRecordsGlobal...)}})
a.savePreviousEntitySegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[1:], clientRecordsGlobal[1:]...)}})
a.savePreviousEntitySegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{Clients: append(clientRecordsLocal[2:], clientRecordsGlobal[2:]...)}})
// Write tokens to old path
a.savePreviousTokenSegments(ctx, twoMonthsAgo.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}})
a.savePreviousTokenSegments(ctx, oneMonthAgo.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}})
a.savePreviousTokenSegments(ctx, startOfMonth.Unix(), "", []*activity.LogFragment{{NonEntityTokens: tokenCounts}})
// Assert that the migration workers have not been run
require.True(t, a.hasDedupClientsUpgrade(ctx))
require.True(t, a.dedupClientsUpgradeComplete.Load())
// Resetting this to false so that we can
// verify that after the migrations is completed, the correct values have been stored
a.dedupClientsUpgradeComplete.Store(false)
require.NoError(t, a.view.Delete(ctx, activityDeduplicationUpgradeKey))
// Forcefully run the secondary migration worker
core.secondaryDuplicateClientMigrationWorker(ctx)
// Wait for the storage migration to complete
ticker := time.NewTicker(100 * time.Millisecond)
timeout := time.After(25 * time.Second)
for {
select {
case <-timeout:
t.Fatal("timed out waiting for migration to complete")
case <-ticker.C:
}
if a.dedupClientsUpgradeComplete.Load() {
break
}
}
// Verify that no global clients have been migrated
times := []time.Time{twoMonthsAgo, oneMonthAgo, startOfMonth}
for _, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
globalClients := make([]*activity.EntityRecord, 0)
for {
segment, err := reader.ReadGlobalEntity(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
globalClients = append(globalClients, segment.GetClients()...)
}
require.Equal(t, 0, len(globalClients))
}
// Verify local clients have been correctly migrated
for index, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
localClients := make([]*activity.EntityRecord, 0)
for {
segment, err := reader.ReadLocalEntity(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
localClients = append(localClients, segment.GetClients()...)
}
require.Equal(t, len(clientRecordsLocal)-index, len(localClients))
}
// Verify non-entity tokens have been correctly migrated
for _, time := range times {
reader, err := a.NewSegmentFileReader(ctx, time)
require.NoError(t, err)
for {
segment, err := reader.ReadToken(ctx)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
// Verify that the data is correct
deep.Equal(segment.GetCountByNamespaceID(), tokenCounts)
}
}
// Check that the storage key has been updated
require.True(t, a.hasDedupClientsUpgrade(ctx))
// Check that the bool has been updated
require.True(t, a.dedupClientsUpgradeComplete.Load())
// Wait for the deletion of old logs to complete
timeout = time.After(25 * time.Second)
// Wait for channel indicating deletion to be written
select {
case <-timeout:
t.Fatal("timed out waiting for deletion to complete")
case <-a.oldStoragePathsCleaned:
break
}
// Verify there is no data at the old entity paths
times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath)
require.NoError(t, err)
require.Equal(t, 0, len(times))
// Verify there is no data at the old token paths
times, err = a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath)
require.NoError(t, err)
require.Equal(t, 0, len(times))
}

View File

@@ -14,7 +14,6 @@ import (
"time" "time"
"github.com/axiomhq/hyperloglog" "github.com/axiomhq/hyperloglog"
semver "github.com/hashicorp/go-version"
"github.com/hashicorp/vault/helper/timeutil" "github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity" "github.com/hashicorp/vault/vault/activity"
@@ -553,98 +552,3 @@ func (a *ActivityLog) namespaceRecordToCountsResponse(record *activity.Namespace
ACMEClients: int(record.ACMEClients), ACMEClients: int(record.ACMEClients),
} }
} }
func (a *ActivityLog) extractLocalGlobalClientsDeprecatedStoragePath(ctx context.Context) (map[int64][]*activity.EntityRecord, map[int64][]*activity.EntityRecord, error) {
clusterGlobalClients := make(map[int64][]*activity.EntityRecord)
clusterLocalClients := make(map[int64][]*activity.EntityRecord)
// Extract global clients on the current cluster per month store them in a map
times, err := a.availableTimesAtPath(ctx, time.Now(), activityEntityBasePath)
if err != nil {
a.logger.Error("could not list available logs until now")
return clusterLocalClients, clusterGlobalClients, fmt.Errorf("could not list available logs on the cluster")
}
for _, time := range times {
entityPath := activityEntityBasePath + fmt.Sprint(time.Unix()) + "/"
segmentPaths, err := a.view.List(ctx, entityPath)
if err != nil {
return nil, nil, err
}
for _, seqNumber := range segmentPaths {
segment, err := a.readEntitySegmentAtPath(ctx, entityPath+seqNumber)
if segment == nil {
continue
}
if err != nil {
a.logger.Warn("failed to read segment", "error", err)
return clusterLocalClients, clusterGlobalClients, err
}
for _, entity := range segment.GetClients() {
// If the client is not local, then add it to a map
if local, _ := a.isClientLocal(entity); !local {
if _, ok := clusterGlobalClients[time.Unix()]; !ok {
clusterGlobalClients[time.Unix()] = make([]*activity.EntityRecord, 0)
}
clusterGlobalClients[time.Unix()] = append(clusterGlobalClients[time.Unix()], entity)
} else {
if _, ok := clusterLocalClients[time.Unix()]; !ok {
clusterLocalClients[time.Unix()] = make([]*activity.EntityRecord, 0)
}
clusterLocalClients[time.Unix()] = append(clusterLocalClients[time.Unix()], entity)
}
}
}
}
return clusterLocalClients, clusterGlobalClients, nil
}
func (a *ActivityLog) extractTokensDeprecatedStoragePath(ctx context.Context) (map[int64][]map[string]uint64, error) {
tokensByMonth := make(map[int64][]map[string]uint64)
times, err := a.availableTimesAtPath(ctx, time.Now(), activityTokenBasePath)
if err != nil {
return nil, err
}
for _, monthTime := range times {
tokenPath := activityTokenBasePath + fmt.Sprint(monthTime.Unix()) + "/"
segmentPaths, err := a.view.List(ctx, tokenPath)
if err != nil {
return nil, err
}
tokensByMonth[monthTime.Unix()] = make([]map[string]uint64, 0)
for _, seqNum := range segmentPaths {
tokenCount, err := a.readTokenSegmentAtPath(ctx, tokenPath+seqNum)
if tokenCount == nil {
a.logger.Error("data at path has been unexpectedly deleted", "path", tokenPath+seqNum)
continue
}
if err != nil {
return nil, err
}
tokensByMonth[monthTime.Unix()] = append(tokensByMonth[monthTime.Unix()], tokenCount.CountByNamespaceID)
}
}
return tokensByMonth, nil
}
// OldestVersionHasDeduplicatedClients returns whether this cluster is 1.19+, and
// hence supports deduplicated clients
func (a *ActivityLog) OldestVersionHasDeduplicatedClients(ctx context.Context) bool {
oldestVersionIsDedupClients := a.core.IsNewInstall(ctx)
if !oldestVersionIsDedupClients {
if v, _, err := a.core.FindOldestVersionTimestamp(); err == nil {
oldestVersion, err := semver.NewSemver(v)
if err != nil {
a.core.logger.Debug("could not extract version instance", "version", v)
return false
}
dedupChangeVersion, err := semver.NewSemver(DeduplicatedClientMinimumVersion)
if err != nil {
a.core.logger.Debug("could not extract version instance", "version", DeduplicatedClientMinimumVersion)
return false
}
oldestVersionIsDedupClients = oldestVersionIsDedupClients || oldestVersion.GreaterThanOrEqual(dedupChangeVersion)
}
}
return oldestVersionIsDedupClients
}

View File

@@ -13,7 +13,6 @@ import (
"time" "time"
"github.com/axiomhq/hyperloglog" "github.com/axiomhq/hyperloglog"
"github.com/go-test/deep"
"github.com/hashicorp/vault/helper/timeutil" "github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/vault/activity" "github.com/hashicorp/vault/vault/activity"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -1015,14 +1014,6 @@ func writeTokenSegment(t *testing.T, core *Core, ts time.Time, index int, item *
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, ts, index), protoItem) WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, ts, index), protoItem)
} }
// writeTokenSegmentOldPath writes a single segment file with the given time and index for a token at the old path
func writeTokenSegmentOldPath(t *testing.T, core *Core, ts time.Time, index int, item *activity.TokenCount) {
t.Helper()
protoItem, err := proto.Marshal(item)
require.NoError(t, err)
WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, ts, index), protoItem)
}
// makeSegmentPath formats the path for a segment at a particular time and index // makeSegmentPath formats the path for a segment at a particular time and index
func makeSegmentPath(t *testing.T, typ string, ts time.Time, index int) string { func makeSegmentPath(t *testing.T, typ string, ts time.Time, index int) string {
t.Helper() t.Helper()
@@ -1222,50 +1213,3 @@ func TestSegmentFileReader(t *testing.T) {
require.True(t, proto.Equal(gotTokens[i], tokens[i])) require.True(t, proto.Equal(gotTokens[i], tokens[i]))
} }
} }
// TestExtractTokens_OldStoragePaths verifies that the correct tokens are extracted
// from the old token paths in storage. These old storage paths were used in <=1.9 to
// store tokens without clientIds (non-entity tokens).
func TestExtractTokens_OldStoragePaths(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
now := time.Now()
// write token at index 3
token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{
"ns": 10,
"ns3": 1,
"ns1": 2,
}}
lastMonth := timeutil.StartOfPreviousMonth(now)
twoMonthsAgo := timeutil.StartOfPreviousMonth(lastMonth)
thisMonthData := []map[string]uint64{token.CountByNamespaceID, token.CountByNamespaceID}
lastMonthData := []map[string]uint64{token.CountByNamespaceID, token.CountByNamespaceID, token.CountByNamespaceID, token.CountByNamespaceID}
twoMonthsAgoData := []map[string]uint64{token.CountByNamespaceID}
expected := map[int64][]map[string]uint64{
now.Unix(): thisMonthData,
lastMonth.Unix(): lastMonthData,
twoMonthsAgo.Unix(): twoMonthsAgoData,
}
// This month's token data is at broken segment sequences
writeTokenSegmentOldPath(t, core, now, 1, token)
writeTokenSegmentOldPath(t, core, now, 3, token)
// Last months token data is at normal segment sequences
writeTokenSegmentOldPath(t, core, lastMonth, 0, token)
writeTokenSegmentOldPath(t, core, lastMonth, 1, token)
writeTokenSegmentOldPath(t, core, lastMonth, 2, token)
writeTokenSegmentOldPath(t, core, lastMonth, 3, token)
// Month before is at only one random segment sequence
writeTokenSegmentOldPath(t, core, twoMonthsAgo, 2, token)
tokens, err := core.activityLog.extractTokensDeprecatedStoragePath(context.Background())
require.NoError(t, err)
require.Equal(t, 3, len(tokens))
if diff := deep.Equal(expected, tokens); diff != nil {
t.Fatal(diff)
}
}