mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 17:52:32 +00:00
OSS-Changes Patch (#29193)
This commit is contained in:
@@ -4095,11 +4095,10 @@ func (a *ActivityLog) writeExport(ctx context.Context, rw http.ResponseWriter, f
|
|||||||
func (c *Core) activityLogMigrationTask(ctx context.Context) {
|
func (c *Core) activityLogMigrationTask(ctx context.Context) {
|
||||||
manager := c.activityLog
|
manager := c.activityLog
|
||||||
if !c.IsPerfSecondary() {
|
if !c.IsPerfSecondary() {
|
||||||
// If the oldest version is less than 1.19 and no migrations tasks have been run, kick off the migration task
|
// If no migrations tasks have been run, kick off the migration task
|
||||||
if !manager.OldestVersionHasDeduplicatedClients(ctx) && !manager.hasDedupClientsUpgrade(ctx) {
|
if !manager.hasDedupClientsUpgrade(ctx) {
|
||||||
go c.primaryDuplicateClientMigrationWorker(ctx)
|
go c.primaryDuplicateClientMigrationWorker(ctx)
|
||||||
} else {
|
} else {
|
||||||
// Store that upgrade processes have already been completed
|
|
||||||
manager.writeDedupClientsUpgrade(ctx)
|
manager.writeDedupClientsUpgrade(ctx)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -4108,6 +4107,8 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) {
|
|||||||
// already upgraded primary
|
// already upgraded primary
|
||||||
if !manager.hasDedupClientsUpgrade(ctx) {
|
if !manager.hasDedupClientsUpgrade(ctx) {
|
||||||
go c.secondaryDuplicateClientMigrationWorker(ctx)
|
go c.secondaryDuplicateClientMigrationWorker(ctx)
|
||||||
|
} else {
|
||||||
|
manager.writeDedupClientsUpgrade(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -4118,7 +4119,12 @@ func (c *Core) activityLogMigrationTask(ctx context.Context) {
|
|||||||
// current cluster. This method wil only exit once all connected secondary clusters have
|
// current cluster. This method wil only exit once all connected secondary clusters have
|
||||||
// upgraded to 1.19, and this cluster receives global data from all of them.
|
// upgraded to 1.19, and this cluster receives global data from all of them.
|
||||||
func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error {
|
func (c *Core) primaryDuplicateClientMigrationWorker(ctx context.Context) error {
|
||||||
|
c.activityLogLock.Lock()
|
||||||
a := c.activityLog
|
a := c.activityLog
|
||||||
|
c.activityLogLock.Unlock()
|
||||||
|
if a == nil {
|
||||||
|
return fmt.Errorf("activity log not configured")
|
||||||
|
}
|
||||||
a.logger.Trace("started primary activity log migration worker")
|
a.logger.Trace("started primary activity log migration worker")
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/axiomhq/hyperloglog"
|
"github.com/axiomhq/hyperloglog"
|
||||||
semver "github.com/hashicorp/go-version"
|
|
||||||
"github.com/hashicorp/vault/helper/timeutil"
|
"github.com/hashicorp/vault/helper/timeutil"
|
||||||
"github.com/hashicorp/vault/sdk/logical"
|
"github.com/hashicorp/vault/sdk/logical"
|
||||||
"github.com/hashicorp/vault/vault/activity"
|
"github.com/hashicorp/vault/vault/activity"
|
||||||
@@ -640,28 +639,6 @@ func (a *ActivityLog) getAllEntitySegmentsForMonth(ctx context.Context, path str
|
|||||||
return segments, nil
|
return segments, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OldestVersionHasDeduplicatedClients returns whether this cluster is 1.19+, and
|
|
||||||
// hence supports deduplicated clients
|
|
||||||
func (a *ActivityLog) OldestVersionHasDeduplicatedClients(ctx context.Context) bool {
|
|
||||||
oldestVersionIsDedupClients := a.core.IsNewInstall(ctx)
|
|
||||||
if !oldestVersionIsDedupClients {
|
|
||||||
if v, _, err := a.core.FindOldestVersionTimestamp(); err == nil {
|
|
||||||
oldestVersion, err := semver.NewSemver(v)
|
|
||||||
if err != nil {
|
|
||||||
a.core.logger.Debug("could not extract version instance", "version", v)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
dedupChangeVersion, err := semver.NewSemver(DeduplicatedClientMinimumVersion)
|
|
||||||
if err != nil {
|
|
||||||
a.core.logger.Debug("could not extract version instance", "version", DeduplicatedClientMinimumVersion)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
oldestVersionIsDedupClients = oldestVersionIsDedupClients || oldestVersion.GreaterThanOrEqual(dedupChangeVersion)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return oldestVersionIsDedupClients
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *ActivityLog) loadClientDataIntoSegment(ctx context.Context, pathPrefix string, startTime time.Time, seqNum uint64, currentSegment *segmentInfo) ([]*activity.EntityRecord, error) {
|
func (a *ActivityLog) loadClientDataIntoSegment(ctx context.Context, pathPrefix string, startTime time.Time, seqNum uint64, currentSegment *segmentInfo) ([]*activity.EntityRecord, error) {
|
||||||
path := pathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(seqNum, 10)
|
path := pathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(seqNum, 10)
|
||||||
out, err := a.readEntitySegmentAtPath(ctx, path)
|
out, err := a.readEntitySegmentAtPath(ctx, path)
|
||||||
|
|||||||
Reference in New Issue
Block a user