Revert "OSS-Changes Patch (#29193)" (#29249)

This reverts commit 1fab64e9c6.
This commit is contained in:
akshya96
2024-12-20 11:28:10 -08:00
committed by GitHub
parent 3754c67abf
commit 357a13fbb0
2 changed files with 26 additions and 9 deletions

View File

@@ -4095,10 +4095,11 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
func (c *Core) activityLogMigrationTask(ctx context.Context) { func (c *Core) activityLogMigrationTask(ctx context.Context) {
manager := c.activityLog manager := c.activityLog
if !c.IsPerfSecondary() { if !c.IsPerfSecondary() {
// If no migrations tasks have been run, kick off the migration task // If the oldest version is less than 1.19 and no migrations tasks have been run, kick off the migration task
if !manager.hasDedupClientsUpgrade(ctx) { if !manager.OldestVersionHasDeduplicatedClients(ctx) && !manager.hasDedupClientsUpgrade(ctx) {
go c.primaryDuplicateClientMigrationWorker(ctx) go c.primaryDuplicateClientMigrationWorker(ctx)
} else { } else {
// Store that upgrade processes have already been completed
manager.writeDedupClientsUpgrade(ctx) manager.writeDedupClientsUpgrade(ctx)
} }
} else { } else {
@@ -4107,8 +4108,6 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) {
// already upgraded primary // already upgraded primary
if !manager.hasDedupClientsUpgrade(ctx) { if !manager.hasDedupClientsUpgrade(ctx) {
go c.secondaryDuplicateClientMigrationWorker(ctx) go c.secondaryDuplicateClientMigrationWorker(ctx)
} else {
manager.writeDedupClientsUpgrade(ctx)
} }
} }
} }
@@ -4119,12 +4118,7 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) {
// current cluster. This method wil only exit once all connected secondary clusters have // current cluster. This method wil only exit once all connected secondary clusters have
// upgraded to 1.19, and this cluster receives global data from all of them. // upgraded to 1.19, and this cluster receives global data from all of them.
func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error { func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error {
c.activityLogLock.Lock()
a := c.activityLog a := c.activityLog
c.activityLogLock.Unlock()
if a == nil {
return fmt.Errorf("activity log not configured")
}
a.logger.Trace("started primary activity log migration worker") a.logger.Trace("started primary activity log migration worker")
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()

View File

@@ -15,6 +15,7 @@ import (
"time" "time"
"github.com/axiomhq/hyperloglog" "github.com/axiomhq/hyperloglog"
semver "github.com/hashicorp/go-version"
"github.com/hashicorp/vault/helper/timeutil" "github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity" "github.com/hashicorp/vault/vault/activity"
@@ -639,6 +640,28 @@ func (a *ActivityLog) getAllEntitySegmentsForMonth(ctx context.Context, path str
return segments, nil return segments, nil
} }
// OldestVersionHasDeduplicatedClients returns whether this cluster is 1.19+, and
// hence supports deduplicated clients
func (a *ActivityLog) OldestVersionHasDeduplicatedClients(ctx context.Context) bool {
oldestVersionIsDedupClients := a.core.IsNewInstall(ctx)
if !oldestVersionIsDedupClients {
if v, _, err := a.core.FindOldestVersionTimestamp(); err == nil {
oldestVersion, err := semver.NewSemver(v)
if err != nil {
a.core.logger.Debug("could not extract version instance", "version", v)
return false
}
dedupChangeVersion, err := semver.NewSemver(DeduplicatedClientMinimumVersion)
if err != nil {
a.core.logger.Debug("could not extract version instance", "version", DeduplicatedClientMinimumVersion)
return false
}
oldestVersionIsDedupClients = oldestVersionIsDedupClients || oldestVersion.GreaterThanOrEqual(dedupChangeVersion)
}
}
return oldestVersionIsDedupClients
}
func (a *ActivityLog) loadClientDataIntoSegment(ctx context.Context, pathPrefix string, startTime time.Time, seqNum uint64, currentSegment *segmentInfo) ([]*activity.EntityRecord, error) { func (a *ActivityLog) loadClientDataIntoSegment(ctx context.Context, pathPrefix string, startTime time.Time, seqNum uint64, currentSegment *segmentInfo) ([]*activity.EntityRecord, error) {
path := pathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(seqNum, 10) path := pathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(seqNum, 10)
out, err := a.readEntitySegmentAtPath(ctx, path) out, err := a.readEntitySegmentAtPath(ctx, path)