From 1aa49af191a0b50b82a6bd5cf0efcd8e7a95807d Mon Sep 17 00:00:00 2001 From: divyaac Date: Thu, 2 Jan 2025 11:42:41 -0800 Subject: [PATCH] Revert "Store global clients at separate storage paths (#28926)" (#29272) This reverts commit e21dfa6b1ce1e3e9ce0705061150070db7cf7d1c. Co-authored-by: akshya96 <87045294+akshya96@users.noreply.github.com> --- .../operator_usage_testonly_test.go | 2 +- sdk/helper/clientcountutil/clientcountutil.go | 23 +- .../clientcountutil/clientcountutil_test.go | 5 +- vault/activity_log.go | 251 ++++---------- vault/activity_log_test.go | 305 ++---------------- vault/activity_log_testing_util.go | 41 +-- vault/activity_log_util_common.go | 23 +- vault/activity_log_util_common_test.go | 30 -- .../acme_regeneration_test.go | 6 +- .../activity_testonly_oss_test.go | 2 +- .../activity_testonly_test.go | 22 +- .../logical_system_activity_write_testonly.go | 112 ++----- ...cal_system_activity_write_testonly_test.go | 29 +- 13 files changed, 155 insertions(+), 696 deletions(-) diff --git a/command/command_testonly/operator_usage_testonly_test.go b/command/command_testonly/operator_usage_testonly_test.go index 4cdfc0536a..31de4b88eb 100644 --- a/command/command_testonly/operator_usage_testonly_test.go +++ b/command/command_testonly/operator_usage_testonly_test.go @@ -53,7 +53,7 @@ func TestOperatorUsageCommandRun(t *testing.T) { now := time.Now().UTC() - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(6, clientcountutil.WithClientType("entity")). NewClientsSeen(4, clientcountutil.WithClientType("non-entity-token")). diff --git a/sdk/helper/clientcountutil/clientcountutil.go b/sdk/helper/clientcountutil/clientcountutil.go index 16344a4d9c..7d0be5526e 100644 --- a/sdk/helper/clientcountutil/clientcountutil.go +++ b/sdk/helper/clientcountutil/clientcountutil.go @@ -282,42 +282,33 @@ func (d *ActivityLogDataGenerator) ToProto() *generation.ActivityLogMockInput { // Write writes the data to the API with the given write options. The method // returns the new paths that have been written. Note that the API endpoint will // only be present when Vault has been compiled with the "testonly" flag. -func (d *ActivityLogDataGenerator) Write(ctx context.Context, writeOptions ...generation.WriteOptions) ([]string, []string, error) { +func (d *ActivityLogDataGenerator) Write(ctx context.Context, writeOptions ...generation.WriteOptions) ([]string, error) { d.data.Write = writeOptions err := VerifyInput(d.data) if err != nil { - return nil, nil, err + return nil, err } data, err := d.ToJSON() if err != nil { - return nil, nil, err + return nil, err } resp, err := d.client.Logical().WriteWithContext(ctx, "sys/internal/counters/activity/write", map[string]interface{}{"input": string(data)}) if err != nil { - return nil, nil, err + return nil, err } if resp.Data == nil { - return nil, nil, fmt.Errorf("received no data") + return nil, fmt.Errorf("received no data") } paths := resp.Data["paths"] castedPaths, ok := paths.([]interface{}) if !ok { - return nil, nil, fmt.Errorf("invalid paths data: %v", paths) + return nil, fmt.Errorf("invalid paths data: %v", paths) } returnPaths := make([]string, 0, len(castedPaths)) for _, path := range castedPaths { returnPaths = append(returnPaths, path.(string)) } - globalPaths := resp.Data["global_paths"] - globalCastedPaths, ok := globalPaths.([]interface{}) - if !ok { - return nil, nil, fmt.Errorf("invalid global paths data: %v", globalPaths) - } - returnGlobalPaths := make([]string, 0, len(globalCastedPaths)) - for _, path := range globalCastedPaths { - returnGlobalPaths = append(returnGlobalPaths, path.(string)) - } - return returnPaths, returnGlobalPaths, nil + return returnPaths, nil } // VerifyInput checks that the input data is valid diff --git a/sdk/helper/clientcountutil/clientcountutil_test.go b/sdk/helper/clientcountutil/clientcountutil_test.go index fc0c695d94..6a5b224bc6 100644 --- a/sdk/helper/clientcountutil/clientcountutil_test.go +++ b/sdk/helper/clientcountutil/clientcountutil_test.go @@ -116,7 +116,7 @@ func TestNewCurrentMonthData_AddClients(t *testing.T) { // sent to the server is correct. func TestWrite(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, err := io.WriteString(w, `{"data":{"paths":["path1","path2"],"global_paths":["path2","path3"]}}`) + _, err := io.WriteString(w, `{"data":{"paths":["path1","path2"]}}`) require.NoError(t, err) body, err := io.ReadAll(r.Body) require.NoError(t, err) @@ -131,7 +131,7 @@ func TestWrite(t *testing.T) { Address: ts.URL, }) require.NoError(t, err) - paths, globalPaths, err := NewActivityLogData(client). + paths, err := NewActivityLogData(client). NewPreviousMonthData(3). NewClientSeen(). NewPreviousMonthData(2). @@ -141,7 +141,6 @@ func TestWrite(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{"path1", "path2"}, paths) - require.Equal(t, []string{"path2", "path3"}, globalPaths) } func testAddClients(t *testing.T, makeGenerator func() *ActivityLogDataGenerator, getClient func(data *ActivityLogDataGenerator) *generation.Client) { diff --git a/vault/activity_log.go b/vault/activity_log.go index 632fe7d013..44f65c37b4 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -43,17 +43,15 @@ const ( activityQueryBasePath = "queries/" activityConfigKey = "config" activityIntentLogKey = "endofmonth" - activityGlobalPathPrefix = "global/" activityACMERegenerationKey = "acme-regeneration" // sketch for each month that stores hash of client ids distinctClientsBasePath = "log/distinctclients/" // for testing purposes (public as needed) - ActivityLogPrefix = "sys/counters/activity/log/" - ActivityGlobalLogPrefix = "sys/counters/activity/global/log/" - ActivityLogLocalPrefix = "sys/counters/activity/local/log/" - ActivityPrefix = "sys/counters/activity/" + ActivityLogPrefix = "sys/counters/activity/log/" + ActivityLogLocalPrefix = "sys/counters/activity/local/log/" + ActivityPrefix = "sys/counters/activity/" // Time to wait before a perf standby sends data to the active node, or // before the active node of a performance secondary sends global data to the primary. @@ -203,9 +201,6 @@ type ActivityLog struct { // track metadata and contents of the most recent log segment currentSegment segmentInfo - // track metadata and contents of the most recent global log segment - currentGlobalSegment segmentInfo - // Fragments received from performance standbys standbyFragmentsReceived []*activity.LogFragment @@ -283,10 +278,6 @@ type ActivityLogCoreConfig struct { // PerfStandbyFragmentSendInterval sets the interval to send fragment data from the perf standby to the active // This is only for testing purposes PerfStandbyFragmentSendInterval time.Duration - - // StorageWriteTestingInterval sets the interval flush data to the storage. - // This is only for testing purposes - StorageWriteTestingInterval time.Duration } // ActivityLogExportRecord is the output structure for activity export @@ -384,19 +375,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me }, clientSequenceNumber: 0, }, - currentGlobalSegment: segmentInfo{ - startTimestamp: 0, - currentClients: &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - }, - // tokenCount is deprecated, but must still exist for the current segment - // so the fragment that was using TWEs before the 1.9 changes - // can be flushed to the current segment. - tokenCount: &activity.TokenCount{ - CountByNamespaceID: make(map[string]uint64), - }, - clientSequenceNumber: 0, - }, standbyFragmentsReceived: make([]*activity.LogFragment, 0), standbyLocalFragmentsReceived: make([]*activity.LogFragment, 0), standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0), @@ -456,8 +434,6 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for a.globalFragmentLock.Lock() secondaryGlobalClients := a.secondaryGlobalClientFragments a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0) - standbyGlobalClients := a.standbyGlobalFragmentsReceived - a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0) globalClients := a.currentGlobalFragment a.currentGlobalFragment = nil a.globalFragmentLock.Unlock() @@ -474,9 +450,6 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for for _, globalReceivedFragment := range secondaryGlobalClients { globalReceivedFragmentTotal += len(globalReceivedFragment.Clients) } - for _, globalReceivedFragment := range standbyGlobalClients { - globalReceivedFragmentTotal += len(globalReceivedFragment.Clients) - } a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_received_fragment_size"}, float32(globalReceivedFragmentTotal), []metricsutil.Label{ @@ -492,17 +465,18 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for return nil } - if ret := a.createCurrentSegmentFromFragments(ctx, append(standbys, currentFragment), &a.currentSegment, force, ""); ret != nil { - return ret - } - - // If we are the primary, store global clients - // Create fragments from global clients and store the segment - if !a.core.IsPerfSecondary() { - globalFragments := append(append(secondaryGlobalClients, globalClients), standbyGlobalClients...) - if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil { - return ret - } + // Measure the current regular fragment + if currentFragment != nil { + a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"}, + float32(len(currentFragment.Clients)), + []metricsutil.Label{ + {"type", "entity"}, + }) + a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"}, + float32(len(currentFragment.NonEntityTokens)), + []metricsutil.Label{ + {"type", "direct_token"}, + }) } // Swap out the pending local fragments @@ -529,14 +503,10 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for }) } - return nil -} - -func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fragments []*activity.LogFragment, currentSegment *segmentInfo, force bool, storagePathPrefix string) error { // Collect new entities and new tokens. saveChanges := false newEntities := make(map[string]*activity.EntityRecord) - for _, f := range fragments { + for _, f := range append(standbys, currentFragment) { if f == nil { continue } @@ -560,7 +530,7 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra // a.partialMonthClientTracker.nonEntityCountByNamespaceID. This preserves backward // compatibility for the precomputedQueryWorkers and the segment storing // logic. - currentSegment.tokenCount.CountByNamespaceID[ns] += val + a.currentSegment.tokenCount.CountByNamespaceID[ns] += val } } @@ -569,14 +539,14 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra } // Will all new entities fit? If not, roll over to a new segment. - available := ActivitySegmentClientCapacity - len(currentSegment.currentClients.Clients) + available := ActivitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients) remaining := available - len(newEntities) excess := 0 if remaining < 0 { excess = -remaining } - segmentClients := currentSegment.currentClients.Clients + segmentClients := a.currentSegment.currentClients.Clients excessClients := make([]*activity.EntityRecord, 0, excess) for _, record := range newEntities { if available > 0 { @@ -586,8 +556,8 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra excessClients = append(excessClients, record) } } - currentSegment.currentClients.Clients = segmentClients - err := a.saveCurrentSegmentInternal(ctx, force, *currentSegment, storagePathPrefix) + a.currentSegment.currentClients.Clients = segmentClients + err := a.saveCurrentSegmentInternal(ctx, force) if err != nil { // The current fragment(s) have already been placed into the in-memory // segment, but we may lose any excess (in excessClients). @@ -597,7 +567,7 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra } if available <= 0 { - if currentSegment.clientSequenceNumber >= activityLogMaxSegmentPerMonth { + if a.currentSegment.clientSequenceNumber >= activityLogMaxSegmentPerMonth { // Cannot send as Warn because it will repeat too often, // and disabling/renabling would be complicated. a.logger.Trace("too many segments in current month", "dropped", len(excessClients)) @@ -605,13 +575,13 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra } // Rotate to next segment - currentSegment.clientSequenceNumber += 1 + a.currentSegment.clientSequenceNumber += 1 if len(excessClients) > ActivitySegmentClientCapacity { a.logger.Warn("too many new active clients, dropping tail", "clients", len(excessClients)) excessClients = excessClients[:ActivitySegmentClientCapacity] } - currentSegment.currentClients.Clients = excessClients - err := a.saveCurrentSegmentInternal(ctx, force, *currentSegment, storagePathPrefix) + a.currentSegment.currentClients.Clients = excessClients + err := a.saveCurrentSegmentInternal(ctx, force) if err != nil { return err } @@ -620,12 +590,12 @@ func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fra } // :force: forces a save of tokens/entities even if the in-memory log is empty -func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool, currentSegment segmentInfo, storagePathPrefix string) error { - _, err := a.saveSegmentEntitiesInternal(ctx, currentSegment, force, storagePathPrefix) +func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error { + _, err := a.saveSegmentEntitiesInternal(ctx, a.currentSegment, force) if err != nil { return err } - _, err = a.saveSegmentTokensInternal(ctx, currentSegment, force) + _, err = a.saveSegmentTokensInternal(ctx, a.currentSegment, force) return err } @@ -644,15 +614,15 @@ func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegm switch { case err != nil: a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error())) - case len(currentSegment.tokenCount.CountByNamespaceID) > 0 && + case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 && (oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(time.Now())): a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion)) default: - if len(currentSegment.tokenCount.CountByNamespaceID) > 0 { + if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 { a.logger.Info("storing nonzero token count") } } - tokenCount, err := proto.Marshal(currentSegment.tokenCount) + tokenCount, err := proto.Marshal(a.currentSegment.tokenCount) if err != nil { return "", err } @@ -669,10 +639,10 @@ func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegm return tokenPath, nil } -func (a *ActivityLog) saveSegmentEntitiesInternal(ctx context.Context, currentSegment segmentInfo, force bool, storagePathPrefix string) (string, error) { - entityPath := fmt.Sprintf("%s%s%d/%d", storagePathPrefix, activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber) +func (a *ActivityLog) saveSegmentEntitiesInternal(ctx context.Context, currentSegment segmentInfo, force bool) (string, error) { + entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber) - for _, client := range currentSegment.currentClients.Clients { + for _, client := range a.currentSegment.currentClients.Clients { // Explicitly catch and throw clear error message if client ID creation and storage // results in a []byte that doesn't assert into a valid string. if !utf8.ValidString(client.ClientID) { @@ -716,7 +686,7 @@ func parseSegmentNumberFromPath(path string) (int, bool) { // sorted last to first func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) { paths := make([]string, 0) - for _, basePath := range []string{activityEntityBasePath, activityGlobalPathPrefix + activityEntityBasePath, activityTokenLocalBasePath} { + for _, basePath := range []string{activityEntityBasePath, activityTokenLocalBasePath} { p, err := a.view.List(ctx, basePath) if err != nil { return nil, err @@ -765,10 +735,10 @@ func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context, now t } // getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists -func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, uint64, bool, error) { +func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, bool, error) { p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") if err != nil { - return 0, 0, false, err + return 0, false, err } highestNum := -1 @@ -780,34 +750,12 @@ func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime } } - segmentPresent := true - segmentHighestNum := uint64(highestNum) if highestNum < 0 { // numbers less than 0 are invalid. if a negative number is the highest value, there isn't a segment - segmentHighestNum = 0 - segmentPresent = false + return 0, false, nil } - globalPaths, err := a.view.List(ctx, activityGlobalPathPrefix+activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") - if err != nil { - return segmentHighestNum, 0, segmentPresent, err - } - - globalHighestNum := -1 - for _, path := range globalPaths { - if num, ok := parseSegmentNumberFromPath(path); ok { - if num > globalHighestNum { - globalHighestNum = num - } - } - } - - if globalHighestNum < 0 { - // numbers less than 0 are invalid. if a negative number is the highest value, there isn't a segment - return segmentHighestNum, 0, segmentPresent, nil - } - - return segmentHighestNum, uint64(globalHighestNum), segmentPresent, nil + return uint64(highestNum), true, nil } // WalkEntitySegments loads each of the entity segments for a particular start time @@ -890,47 +838,29 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time } a.l.RLock() - defer a.l.RUnlock() a.fragmentLock.Lock() + a.globalFragmentLock.Lock() // Handle the (unlikely) case where the end of the month has been reached while background loading. // Or the feature has been disabled. if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp { for _, ent := range out.Clients { a.partialMonthClientTracker[ent.ClientID] = ent + if local, _ := a.isClientLocal(ent); !local { + a.globalPartialMonthClientTracker[ent.ClientID] = ent + } } } a.fragmentLock.Unlock() - - globalPath := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) - data, err = a.view.Get(ctx, globalPath) - if err != nil { - return err - } - if data == nil { - return nil - } - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(data.Value, out) - if err != nil { - return err - } - a.globalFragmentLock.Lock() - // Handle the (unlikely) case where the end of the month has been reached while background loading. - // Or the feature has been disabled. - if a.enabled && startTime.Unix() == a.currentGlobalSegment.startTimestamp { - for _, ent := range out.Clients { - a.globalPartialMonthClientTracker[ent.ClientID] = ent - } - } a.globalFragmentLock.Unlock() + a.l.RUnlock() return nil } // loadCurrentClientSegment loads the most recent segment (for "this month") // into memory (to append new entries), and to the partialMonthClientTracker to -// avoid duplication call with fragmentLock, globalFragmentLock and l held. -func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64, globalSegmentSequenceNumber uint64) error { +// avoid duplication call with fragmentLock and l held. +func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64) error { path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) data, err := a.view.Get(ctx, path) if err != nil { @@ -962,40 +892,9 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti for _, client := range out.Clients { a.partialMonthClientTracker[client.ClientID] = client - } - - path = activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10) - data, err = a.view.Get(ctx, path) - if err != nil { - return err - } - if data == nil { - return nil - } - - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(data.Value, out) - if err != nil { - return err - } - - if !a.core.perfStandby { - a.currentGlobalSegment = segmentInfo{ - startTimestamp: startTime.Unix(), - currentClients: &activity.EntityActivityLog{ - Clients: out.Clients, - }, - tokenCount: &activity.TokenCount{ - CountByNamespaceID: make(map[string]uint64), - }, - clientSequenceNumber: sequenceNum, + if local, _ := a.isClientLocal(client); !local { + a.globalPartialMonthClientTracker[client.ClientID] = client } - } else { - // populate this for edge case checking (if end of month passes while background loading on standby) - a.currentGlobalSegment.startTimestamp = startTime.Unix() - } - for _, client := range out.Clients { - a.globalPartialMonthClientTracker[client.ClientID] = client } return nil @@ -1076,36 +975,33 @@ func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitG } // Initialize a new current segment, based on the current time. -// Call with fragmentLock, globalFragmentLock and l held. +// Call with fragmentLock and l held. func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) { a.logger.Trace("initializing new log") a.resetCurrentLog() a.currentSegment.startTimestamp = now.Unix() - a.currentGlobalSegment.startTimestamp = now.Unix() } -// Should be called with fragmentLock, globalFragmentLock and l held. +// Should be called with fragmentLock and l held. func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) { a.logger.Trace("continuing log to new month") a.resetCurrentLog() monthStart := timeutil.StartOfMonth(currentTime.UTC()) a.currentSegment.startTimestamp = monthStart.Unix() - a.currentGlobalSegment.startTimestamp = monthStart.Unix() } // Initialize a new current segment, based on the given time -// should be called with fragmentLock, globalFragmentLock and l held. +// should be called with fragmentLock and l held. func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) { timestamp := t.Unix() a.logger.Trace("starting a segment", "timestamp", timestamp) a.resetCurrentLog() a.currentSegment.startTimestamp = timestamp - a.currentGlobalSegment.startTimestamp = timestamp } // Reset all the current segment state. -// Should be called with fragmentLock, globalFragmentLock and l held. +// Should be called with fragmentLock and l held. func (a *ActivityLog) resetCurrentLog() { a.currentSegment.startTimestamp = 0 a.currentSegment.currentClients = &activity.EntityActivityLog{ @@ -1119,25 +1015,19 @@ func (a *ActivityLog) resetCurrentLog() { } a.currentSegment.clientSequenceNumber = 0 + a.fragment = nil a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) - a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) - - a.currentGlobalSegment.startTimestamp = 0 - a.currentGlobalSegment.currentClients = &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - } - a.currentGlobalSegment.clientSequenceNumber = 0 a.currentGlobalFragment = nil a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) + + a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0) - a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0) } func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) { entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp) tokenPath := fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp) - globalEntityPath := fmt.Sprintf("%s%v%v/", activityGlobalPathPrefix, activityEntityBasePath, startTimestamp) entitySegments, err := a.view.List(ctx, entityPath) if err != nil { @@ -1163,18 +1053,6 @@ func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, } } - globalEntitySegments, err := a.view.List(ctx, globalEntityPath) - if err != nil { - a.logger.Error("could not list global entity paths", "error", err) - return - } - for _, p := range globalEntitySegments { - err = a.view.Delete(ctx, globalEntityPath+p) - if err != nil { - a.logger.Error("could not delete global entity log", "error", err) - } - } - // Allow whoever started this as a goroutine to wait for it to finish. close(whenDone) } @@ -1270,7 +1148,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro } // load entity logs from storage into memory - lastSegment, globalLastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent) + lastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent) if err != nil { return err } @@ -1279,7 +1157,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro return nil } - err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment, globalLastSegment) + err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment) if err != nil || lastSegment == 0 { return err } @@ -1342,7 +1220,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled) } - if !a.enabled && a.currentSegment.startTimestamp != 0 && a.currentGlobalSegment.startTimestamp != 0 { + if !a.enabled && a.currentSegment.startTimestamp != 0 { a.logger.Trace("deleting current segment") a.deleteDone = make(chan struct{}) // this is called from a request under stateLock, so use activeContext @@ -1351,7 +1229,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { } forceSave := false - if a.enabled && a.currentSegment.startTimestamp == 0 && a.currentGlobalSegment.startTimestamp == 0 { + if a.enabled && a.currentSegment.startTimestamp == 0 { a.startNewCurrentLogLocked(a.clock.Now().UTC()) // Force a save so we can distinguish between // @@ -1368,8 +1246,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { if forceSave { // l is still held here - a.saveCurrentSegmentInternal(ctx, true, a.currentSegment, "") - a.saveCurrentSegmentInternal(ctx, true, a.currentGlobalSegment, activityGlobalPathPrefix) + a.saveCurrentSegmentInternal(ctx, true) } a.defaultReportMonths = config.DefaultReportMonths @@ -1805,13 +1682,7 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) { // activeFragmentWorker handles scheduling the write of the next // segment. It runs on active nodes only. func (a *ActivityLog) activeFragmentWorker(ctx context.Context) { - writeInterval := activitySegmentInterval - // This changes the interval to a duration that was set for testing purposes - if a.configOverrides.StorageWriteTestingInterval.Microseconds() > 0 { - writeInterval = a.configOverrides.StorageWriteTestingInterval - } - - ticker := a.clock.NewTicker(writeInterval) + ticker := a.clock.NewTicker(activitySegmentInterval) endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now())) if a.configOverrides.DisableTimers { diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index fab86b7cd1..c423d44a45 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -9,7 +9,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "net/http" "reflect" "sort" @@ -579,7 +578,6 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) { now.Add(2 * time.Second).Unix(), } path := fmt.Sprintf("%sentity/%d/0", ActivityLogPrefix, a.GetStartTimestamp()) - globalPath := fmt.Sprintf("%sentity/%d/0", ActivityGlobalLogPrefix, a.GetStartTimestamp()) a.AddEntityToFragment(ids[0], "root", times[0]) a.AddEntityToFragment(ids[1], "root2", times[1]) @@ -616,14 +614,6 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) { t.Fatalf("could not unmarshal protobuf: %v", err) } expectedEntityIDs(t, out, ids) - - protoSegment = readSegmentFromStorage(t, core, globalPath) - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(protoSegment.Value, out) - if err != nil { - t.Fatalf("could not unmarshal protobuf: %v", err) - } - expectedEntityIDs(t, out, ids) } // TestActivityLog_StoreAndReadHyperloglog inserts into a hyperloglog, stores it and then reads it back. The test @@ -1221,55 +1211,45 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) { core, _, _ := TestCoreUnsealed(t) a := core.activityLog paths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/0", "entity/1111/1"} - globalPaths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/1"} for _, path := range paths { WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test")) } - for _, path := range globalPaths { - WriteToStorage(t, core, ActivityGlobalLogPrefix+path, []byte("test")) - } testCases := []struct { - input int64 - expectedVal uint64 - expectedGlobalVal uint64 - expectExists bool + input int64 + expectedVal uint64 + expectExists bool }{ { - input: 992, - expectedVal: 0, - expectedGlobalVal: 0, - expectExists: true, + input: 992, + expectedVal: 0, + expectExists: true, }, { - input: 1000, - expectedVal: 0, - expectedGlobalVal: 0, - expectExists: false, + input: 1000, + expectedVal: 0, + expectExists: false, }, { - input: 1001, - expectedVal: 0, - expectedGlobalVal: 0, - expectExists: false, + input: 1001, + expectedVal: 0, + expectExists: false, }, { - input: 1111, - expectedVal: 1, - expectedGlobalVal: 1, - expectExists: true, + input: 1111, + expectedVal: 1, + expectExists: true, }, { - input: 2222, - expectedVal: 0, - expectedGlobalVal: 0, - expectExists: false, + input: 2222, + expectedVal: 0, + expectExists: false, }, } ctx := context.Background() for _, tc := range testCases { - result, globalSegmentNumber, exists, err := a.getLastEntitySegmentNumber(ctx, time.Unix(tc.input, 0)) + result, exists, err := a.getLastEntitySegmentNumber(ctx, time.Unix(tc.input, 0)) if err != nil { t.Fatalf("unexpected error for input %d: %v", tc.input, err) } @@ -1279,10 +1259,6 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) { if result != tc.expectedVal { t.Errorf("expected: %d got: %d for input: %d", tc.expectedVal, result, tc.input) } - if globalSegmentNumber != tc.expectedGlobalVal { - t.Errorf("expected: %d got: %d for input: %d", tc.expectedGlobalVal, globalSegmentNumber, tc.input) - } - } } @@ -1495,19 +1471,16 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { t.Fatalf(err.Error()) } WriteToStorage(t, core, ActivityLogPrefix+tc.path, data) - WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data) } ctx := context.Background() for _, tc := range testCases { a.l.Lock() a.fragmentLock.Lock() - a.globalFragmentLock.Lock() // loadCurrentClientSegment requires us to grab the fragment lock and the // activityLog lock, as per the comment in the loadCurrentClientSegment // function - err := a.loadCurrentClientSegment(ctx, time.Unix(tc.time, 0), tc.seqNum, tc.seqNum) - a.globalFragmentLock.Unlock() + err := a.loadCurrentClientSegment(ctx, time.Unix(tc.time, 0), tc.seqNum) a.fragmentLock.Unlock() a.l.Unlock() @@ -1534,11 +1507,6 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentEntities, tc.path) } - currentGlobalEntities := a.GetCurrentGlobalEntities() - if !entityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) { - t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentGlobalEntities, tc.path) - } - activeClients := core.GetActiveClientsList() if err := ActiveEntitiesEqual(activeClients, tc.entities.Clients); err != nil { t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v for path %q: %v", tc.entities.Clients, activeClients, tc.path, err) @@ -1616,7 +1584,6 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { t.Fatalf(err.Error()) } WriteToStorage(t, core, ActivityLogPrefix+tc.path, data) - WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data) } ctx := context.Background() @@ -1627,9 +1594,7 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { a.localFragmentLock.Lock() a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord) - a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) a.currentSegment.startTimestamp = tc.time - a.currentGlobalSegment.startTimestamp = tc.time a.fragmentLock.Unlock() a.localFragmentLock.Unlock() a.l.Unlock() @@ -1803,10 +1768,8 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities } if i == 0 { WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData) - WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData) } else { WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) - WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) } } } @@ -1856,7 +1819,7 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) { Clients: expectedClientRecords[len(expectedClientRecords)-1:], } - currentEntities := a.GetCurrentGlobalEntities() + currentEntities := a.GetCurrentEntities() if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { // we only expect the newest entity segment to be loaded (for the current month) t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) @@ -2349,13 +2312,6 @@ func TestActivityLog_EndOfMonth(t *testing.T) { if err != nil { t.Fatal(err) } - path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, segment0) - protoSegment = readSegmentFromStorage(t, core, path) - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(protoSegment.Value, out) - if err != nil { - t.Fatal(err) - } segment1 := a.GetStartTimestamp() expectedTimestamp := timeutil.StartOfMonth(month1).Unix() @@ -2418,16 +2374,6 @@ func TestActivityLog_EndOfMonth(t *testing.T) { t.Fatalf("could not unmarshal protobuf: %v", err) } expectedEntityIDs(t, out, tc.ExpectedEntityIDs) - - // Check for global entities at global storage path - path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, tc.SegmentTimestamp) - protoSegment = readSegmentFromStorage(t, core, path) - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(protoSegment.Value, out) - if err != nil { - t.Fatalf("could not unmarshal protobuf: %v", err) - } - expectedEntityIDs(t, out, tc.ExpectedEntityIDs) } } @@ -5420,212 +5366,3 @@ func TestActivityLog_Export_CSV_Header(t *testing.T) { require.Empty(t, deep.Equal(expectedColumnIndex, encoder.columnIndex)) } - -// TestCreateSegment_StoreSegment verifies that -// the activity log will correctly create segments from -// the fragments and store the right number of clients at -// the proper path. This test should be modified to include local clients. -func TestCreateSegment_StoreSegment(t *testing.T) { - cluster := NewTestCluster(t, nil, nil) - core := cluster.Cores[0].Core - a := core.activityLog - a.SetEnable(true) - - ctx := context.Background() - timeStamp := time.Now() - - clientRecords := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1) - for i := range clientRecords { - clientRecords[i] = &activity.EntityRecord{ - ClientID: fmt.Sprintf("111122222-3333-4444-5555-%012v", i), - Timestamp: timeStamp.Unix(), - NonEntity: false, - } - } - - startTime := a.GetStartTimestamp() - parsedTime := time.Unix(startTime, 0) - - testCases := []struct { - testName string - numClients int - pathPrefix string - maxClientsPerFragment int - global bool - forceStore bool - }{ - { - testName: "[global] max client size, drop clients", - numClients: ActivitySegmentClientCapacity*2 + 1, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: true, - }, - { - testName: "[global, no-force] max client size, drop clients", - numClients: ActivitySegmentClientCapacity*2 + 1, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: true, - forceStore: true, - }, - { - testName: "[global] max segment size", - numClients: ActivitySegmentClientCapacity, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: true, - }, - { - testName: "[global, no-force] max segment size", - numClients: ActivitySegmentClientCapacity, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: true, - forceStore: true, - }, - { - testName: "[global] max segment size, multiple fragments", - numClients: ActivitySegmentClientCapacity, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: true, - }, - { - testName: "[global, no-force] max segment size, multiple fragments", - numClients: ActivitySegmentClientCapacity, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: true, - forceStore: true, - }, - { - testName: "[global] roll over", - numClients: ActivitySegmentClientCapacity + 2, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: true, - }, - { - testName: "[global, no-force] roll over", - numClients: ActivitySegmentClientCapacity + 2, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: true, - forceStore: true, - }, - { - testName: "[global] max segment size, rollover multiple fragments", - numClients: ActivitySegmentClientCapacity * 2, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: true, - }, - { - testName: "[global, no-force] max segment size, rollover multiple fragments", - numClients: ActivitySegmentClientCapacity * 2, - pathPrefix: activityGlobalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: true, - forceStore: true, - }, - - { - testName: "[non-global] max segment size", - numClients: ActivitySegmentClientCapacity, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - }, - { - testName: "[non-global] max segment size, multiple fragments", - numClients: ActivitySegmentClientCapacity, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: false, - }, - { - testName: "[non-global] roll over", - numClients: ActivitySegmentClientCapacity + 2, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - }, - { - testName: "[non-global] max segment size, rollover multiple fragments", - numClients: ActivitySegmentClientCapacity * 2, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: false, - }, - { - testName: "[non-global] max client size, drop clients", - numClients: ActivitySegmentClientCapacity*2 + 1, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - }, - } - - for _, test := range testCases { - t.Run(test.testName, func(t *testing.T) { - // Add clients to fragments - fragments := make([]*activity.LogFragment, 0) - remainder := test.numClients - var i int - for i = 0; i+test.maxClientsPerFragment < test.numClients; i = i + test.maxClientsPerFragment { - clients := clientRecords[i : i+test.maxClientsPerFragment] - remainder -= test.maxClientsPerFragment - fragments = append(fragments, &activity.LogFragment{Clients: clients}) - } - if remainder > 0 { - clients := clientRecords[i : i+remainder] - fragments = append(fragments, &activity.LogFragment{Clients: clients}) - - } - - segment := &a.currentGlobalSegment - if !test.global { - segment = &a.currentSegment - } - - // Create segments and write to storage - require.NoError(t, core.StoreCurrentSegment(ctx, fragments, segment, test.forceStore, test.pathPrefix)) - - reader, err := a.NewSegmentFileReader(ctx, parsedTime) - require.NoError(t, err) - var clientTotal int - if test.global { - for { - entity, err := reader.ReadGlobalEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - clientTotal += len(entity.GetClients()) - } - } else { - for { - entity, err := reader.ReadEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - clientTotal += len(entity.GetClients()) - } - } - - // The current behavior is that there were greater than 2 * ActivitySegmentClientCapacity seen, then we - // drop of the remainder of those clients seen during that time. Let's verify that this is the case - expectedTotal := test.numClients - if test.numClients > 2*ActivitySegmentClientCapacity { - expectedTotal = 2 * ActivitySegmentClientCapacity - } - - require.Equal(t, expectedTotal, clientTotal) - - // Delete any logs written in this test - core.DeleteLogsAtPath(ctx, t, test.pathPrefix+activityEntityBasePath, startTime) - // Reset client sequence number and current client slice back to original values - segment.clientSequenceNumber = 0 - segment.currentClients = &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - } - }) - } -} diff --git a/vault/activity_log_testing_util.go b/vault/activity_log_testing_util.go index 0f02390012..1bc7d6d3dd 100644 --- a/vault/activity_log_testing_util.go +++ b/vault/activity_log_testing_util.go @@ -100,15 +100,6 @@ func (a *ActivityLog) GetCurrentEntities() *activity.EntityActivityLog { return a.currentSegment.currentClients } -// GetCurrentGlobalEntities returns the current global entity activity log -func (a *ActivityLog) GetCurrentGlobalEntities() *activity.EntityActivityLog { - a.l.RLock() - defer a.l.RUnlock() - a.globalFragmentLock.RLock() - defer a.globalFragmentLock.RUnlock() - return a.currentGlobalSegment.currentClients -} - // WriteToStorage is used to put entity data in storage // `path` should be the complete path (not relative to the view) func WriteToStorage(t *testing.T, c *Core, path string, data []byte) { @@ -227,12 +218,7 @@ func ActiveEntitiesEqual(active []*activity.EntityRecord, test []*activity.Entit func (a *ActivityLog) GetStartTimestamp() int64 { a.l.RLock() defer a.l.RUnlock() - // TODO: We will substitute a.currentSegment with a.currentLocalSegment when we remove - // a.currentSegment from the code - if a.currentGlobalSegment.startTimestamp != a.currentSegment.startTimestamp { - return -1 - } - return a.currentGlobalSegment.startTimestamp + return a.currentSegment.startTimestamp } // SetStartTimestamp sets the start timestamp on an activity log @@ -240,7 +226,6 @@ func (a *ActivityLog) SetStartTimestamp(timestamp int64) { a.l.Lock() defer a.l.Unlock() a.currentSegment.startTimestamp = timestamp - a.currentGlobalSegment.startTimestamp = timestamp } // GetStoredTokenCountByNamespaceID returns the count of tokens by namespace ID @@ -302,27 +287,3 @@ func (c *Core) GetActiveFragment() *activity.LogFragment { defer c.activityLog.fragmentLock.RUnlock() return c.activityLog.fragment } - -// StoreCurrentSegment is a test only method to create and store -// segments from fragments. This allows createCurrentSegmentFromFragments to remain -// private -func (c *Core) StoreCurrentSegment(ctx context.Context, fragments []*activity.LogFragment, currentSegment *segmentInfo, force bool, storagePathPrefix string) error { - return c.activityLog.createCurrentSegmentFromFragments(ctx, fragments, currentSegment, force, storagePathPrefix) -} - -// DeleteLogsAtPath is test helper function deletes all logs at the given path -func (c *Core) DeleteLogsAtPath(ctx context.Context, t *testing.T, storagePath string, startTime int64) { - basePath := storagePath + fmt.Sprint(startTime) + "/" - a := c.activityLog - segments, err := a.view.List(ctx, basePath) - if err != nil { - t.Fatalf("could not list path %v", err) - return - } - for _, p := range segments { - err = a.view.Delete(ctx, basePath+p) - if err != nil { - t.Fatalf("could not delete path %v", err) - } - } -} diff --git a/vault/activity_log_util_common.go b/vault/activity_log_util_common.go index 67c7a847e4..4be1bfdcb4 100644 --- a/vault/activity_log_util_common.go +++ b/vault/activity_log_util_common.go @@ -424,16 +424,14 @@ type singleTypeSegmentReader struct { a *ActivityLog } type segmentReader struct { - tokens *singleTypeSegmentReader - entities *singleTypeSegmentReader - globalEntities *singleTypeSegmentReader + tokens *singleTypeSegmentReader + entities *singleTypeSegmentReader } // SegmentReader is an interface that provides methods to read tokens and entities in order type SegmentReader interface { ReadToken(ctx context.Context) (*activity.TokenCount, error) ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error) - ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error) } func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.Time) (SegmentReader, error) { @@ -441,15 +439,11 @@ func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.T if err != nil { return nil, err } - globalEntities, err := a.newSingleTypeSegmentReader(ctx, startTime, activityGlobalPathPrefix+activityEntityBasePath) - if err != nil { - return nil, err - } tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenLocalBasePath) if err != nil { return nil, err } - return &segmentReader{entities: entities, globalEntities: globalEntities, tokens: tokens}, nil + return &segmentReader{entities: entities, tokens: tokens}, nil } func (a *ActivityLog) newSingleTypeSegmentReader(ctx context.Context, startTime time.Time, prefix string) (*singleTypeSegmentReader, error) { @@ -515,17 +509,6 @@ func (e *segmentReader) ReadEntity(ctx context.Context) (*activity.EntityActivit return out, nil } -// ReadGlobalEntity reads a global entity from the global segment -// If there is none available, then the error will be io.EOF -func (e *segmentReader) ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error) { - out := &activity.EntityActivityLog{} - err := e.globalEntities.nextValue(ctx, out) - if err != nil { - return nil, err - } - return out, nil -} - // namespaceRecordToCountsResponse converts the record to the ResponseCounts // type. The function sums entity, non-entity, and secret sync counts to get the // total client count. diff --git a/vault/activity_log_util_common_test.go b/vault/activity_log_util_common_test.go index d481560dc8..76001f35dc 100644 --- a/vault/activity_log_util_common_test.go +++ b/vault/activity_log_util_common_test.go @@ -990,14 +990,6 @@ func Test_ActivityLog_ComputeCurrentMonth_NamespaceMounts(t *testing.T) { } } -// writeGlobalEntitySegment writes a single global segment file with the given time and index for an entity -func writeGlobalEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) { - t.Helper() - protoItem, err := proto.Marshal(item) - require.NoError(t, err) - WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, ts, index), protoItem) -} - // writeEntitySegment writes a single segment file with the given time and index for an entity func writeEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) { t.Helper() @@ -1030,7 +1022,6 @@ func TestSegmentFileReader_BadData(t *testing.T) { // write bad data that won't be able to be unmarshaled at index 0 WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, 0), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data")) - WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, 0), []byte("fake data")) // write entity at index 1 entity := &activity.EntityActivityLog{Clients: []*activity.EntityRecord{ @@ -1040,9 +1031,6 @@ func TestSegmentFileReader_BadData(t *testing.T) { }} writeEntitySegment(t, core, now, 1, entity) - // write global data at index 1 - writeGlobalEntitySegment(t, core, now, 1, entity) - // write token at index 1 token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{ "ns": 1, @@ -1059,14 +1047,6 @@ func TestSegmentFileReader_BadData(t *testing.T) { require.True(t, proto.Equal(gotEntity, entity)) require.Nil(t, err) - // first the bad global entity is read, which returns an error - _, err = reader.ReadGlobalEntity(context.Background()) - require.Error(t, err) - // then, the reader can read the good entity at index 1 - gotEntity, err = reader.ReadGlobalEntity(context.Background()) - require.True(t, proto.Equal(gotEntity, entity)) - require.Nil(t, err) - // the bad token causes an error _, err = reader.ReadToken(context.Background()) require.Error(t, err) @@ -1085,7 +1065,6 @@ func TestSegmentFileReader_MissingData(t *testing.T) { for i := 0; i < 3; i++ { WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, i), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, i), []byte("fake data")) - WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, i), []byte("fake data")) } // write entity at index 3 @@ -1095,8 +1074,6 @@ func TestSegmentFileReader_MissingData(t *testing.T) { }, }} writeEntitySegment(t, core, now, 3, entity) - // write global entity at index 3 - writeGlobalEntitySegment(t, core, now, 3, entity) // write token at index 3 token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{ "ns": 1, @@ -1109,7 +1086,6 @@ func TestSegmentFileReader_MissingData(t *testing.T) { for i := 0; i < 3; i++ { require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenLocalBasePath, now, i))) require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i))) - require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, i))) } // we expect the reader to only return the data at index 3, and then be done @@ -1124,12 +1100,6 @@ func TestSegmentFileReader_MissingData(t *testing.T) { require.True(t, proto.Equal(gotToken, token)) _, err = reader.ReadToken(context.Background()) require.Equal(t, err, io.EOF) - - gotEntity, err = reader.ReadGlobalEntity(context.Background()) - require.NoError(t, err) - require.True(t, proto.Equal(gotEntity, entity)) - _, err = reader.ReadGlobalEntity(context.Background()) - require.Equal(t, err, io.EOF) } // TestSegmentFileReader_NoData verifies that the reader return io.EOF when there is no data diff --git a/vault/external_tests/activity_testonly/acme_regeneration_test.go b/vault/external_tests/activity_testonly/acme_regeneration_test.go index 5d70dc0c21..dbd8355f81 100644 --- a/vault/external_tests/activity_testonly/acme_regeneration_test.go +++ b/vault/external_tests/activity_testonly/acme_regeneration_test.go @@ -54,7 +54,7 @@ func TestACMERegeneration_RegenerateWithCurrentMonth(t *testing.T) { }) require.NoError(t, err) now := time.Now().UTC() - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(3). // 3 months ago, 15 non-entity clients and 10 ACME clients NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). @@ -116,7 +116,7 @@ func TestACMERegeneration_RegenerateMuchOlder(t *testing.T) { client := cluster.Cores[0].Client now := time.Now().UTC() - _, _, err := clientcountutil.NewActivityLogData(client). + _, err := clientcountutil.NewActivityLogData(client). NewPreviousMonthData(5). // 5 months ago, 15 non-entity clients and 10 ACME clients NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). @@ -159,7 +159,7 @@ func TestACMERegeneration_RegeneratePreviousMonths(t *testing.T) { client := cluster.Cores[0].Client now := time.Now().UTC() - _, _, err := clientcountutil.NewActivityLogData(client). + _, err := clientcountutil.NewActivityLogData(client). NewPreviousMonthData(3). // 3 months ago, 15 non-entity clients and 10 ACME clients NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). diff --git a/vault/external_tests/activity_testonly/activity_testonly_oss_test.go b/vault/external_tests/activity_testonly/activity_testonly_oss_test.go index c5463bb801..dbb11c8453 100644 --- a/vault/external_tests/activity_testonly/activity_testonly_oss_test.go +++ b/vault/external_tests/activity_testonly/activity_testonly_oss_test.go @@ -29,7 +29,7 @@ func Test_ActivityLog_Disable(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(5). NewCurrentMonthData(). diff --git a/vault/external_tests/activity_testonly/activity_testonly_test.go b/vault/external_tests/activity_testonly/activity_testonly_test.go index 481416024d..8141357efe 100644 --- a/vault/external_tests/activity_testonly/activity_testonly_test.go +++ b/vault/external_tests/activity_testonly/activity_testonly_test.go @@ -86,7 +86,7 @@ func Test_ActivityLog_LoseLeadership(t *testing.T) { }) require.NoError(t, err) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10). Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) @@ -121,7 +121,7 @@ func Test_ActivityLog_ClientsOverlapping(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(7). NewCurrentMonthData(). @@ -169,7 +169,7 @@ func Test_ActivityLog_ClientsNewCurrentMonth(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(5). NewCurrentMonthData(). @@ -203,7 +203,7 @@ func Test_ActivityLog_EmptyDataMonths(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10). Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) @@ -243,7 +243,7 @@ func Test_ActivityLog_FutureEndDate(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(10). NewCurrentMonthData(). @@ -316,7 +316,7 @@ func Test_ActivityLog_ClientTypeResponse(t *testing.T) { _, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ "enabled": "enable", }) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) @@ -369,7 +369,7 @@ func Test_ActivityLogCurrentMonth_Response(t *testing.T) { _, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ "enabled": "enable", }) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) @@ -420,7 +420,7 @@ func Test_ActivityLog_Deduplication(t *testing.T) { _, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ "enabled": "enable", }) - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(3). NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). NewPreviousMonthData(2). @@ -462,7 +462,7 @@ func Test_ActivityLog_MountDeduplication(t *testing.T) { require.NoError(t, err) now := time.Now().UTC() - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientSeen(clientcountutil.WithClientMount("sys")). NewClientSeen(clientcountutil.WithClientMount("secret")). @@ -669,7 +669,7 @@ func Test_ActivityLog_Export_Sudo(t *testing.T) { rootToken := client.Token() - _, _, err = clientcountutil.NewActivityLogData(client). + _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10). Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) @@ -845,7 +845,7 @@ func TestHandleQuery_MultipleMounts(t *testing.T) { } // Write all the client count data - _, _, err = activityLogGenerator.Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) + _, err = activityLogGenerator.Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) require.NoError(t, err) endOfCurrentMonth := timeutil.EndOfMonth(time.Now().UTC()) diff --git a/vault/logical_system_activity_write_testonly.go b/vault/logical_system_activity_write_testonly.go index b5f80e380e..de52e4de35 100644 --- a/vault/logical_system_activity_write_testonly.go +++ b/vault/logical_system_activity_write_testonly.go @@ -85,15 +85,14 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo for _, opt := range input.Write { opts[opt] = struct{}{} } - paths, globalPaths, err := generated.write(ctx, opts, b.Core.activityLog, now) + paths, err := generated.write(ctx, opts, b.Core.activityLog, now) if err != nil { b.logger.Debug("failed to write activity log data", "error", err.Error()) return logical.ErrorResponse("failed to write data"), err } return &logical.Response{ Data: map[string]interface{}{ - "paths": paths, - "global_paths": globalPaths, + "paths": paths, }, }, nil } @@ -102,14 +101,9 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo type singleMonthActivityClients struct { // clients are indexed by ID clients []*activity.EntityRecord - // globalClients are indexed by ID - globalClients []*activity.EntityRecord // predefinedSegments map from the segment number to the client's index in // the clients slice predefinedSegments map[int][]int - // predefinedGlobalSegments map from the segment number to the client's index in - // the clients slice - predefinedGlobalSegments map[int][]int // generationParameters holds the generation request generationParameters *generation.Data } @@ -120,19 +114,11 @@ type multipleMonthsActivityClients struct { months []*singleMonthActivityClients } -func (s *singleMonthActivityClients) addEntityRecord(core *Core, record *activity.EntityRecord, segmentIndex *int) { +func (s *singleMonthActivityClients) addEntityRecord(record *activity.EntityRecord, segmentIndex *int) { s.clients = append(s.clients, record) - local, _ := core.activityLog.isClientLocal(record) - if !local { - s.globalClients = append(s.globalClients, record) - } if segmentIndex != nil { index := len(s.clients) - 1 s.predefinedSegments[*segmentIndex] = append(s.predefinedSegments[*segmentIndex], index) - if !local { - globalIndex := len(s.globalClients) - 1 - s.predefinedGlobalSegments[*segmentIndex] = append(s.predefinedGlobalSegments[*segmentIndex], globalIndex) - } } } @@ -140,7 +126,7 @@ func (s *singleMonthActivityClients) addEntityRecord(core *Core, record *activit // keys are the segment index, and the value are the clients that were seen in // that index. If the value is an empty slice, then it's an empty index. If the // value is nil, then it's a skipped index -func (s *singleMonthActivityClients) populateSegments(predefinedSegments map[int][]int, clients []*activity.EntityRecord) (map[int][]*activity.EntityRecord, error) { +func (s *singleMonthActivityClients) populateSegments() (map[int][]*activity.EntityRecord, error) { segments := make(map[int][]*activity.EntityRecord) ignoreIndexes := make(map[int]struct{}) skipIndexes := s.generationParameters.SkipSegmentIndexes @@ -156,11 +142,11 @@ func (s *singleMonthActivityClients) populateSegments(predefinedSegments map[int } // if we have predefined segments, then we can construct the map using those - if len(predefinedSegments) > 0 { - for segment, clientIndexes := range predefinedSegments { + if len(s.predefinedSegments) > 0 { + for segment, clientIndexes := range s.predefinedSegments { clientsInSegment := make([]*activity.EntityRecord, 0, len(clientIndexes)) for _, idx := range clientIndexes { - clientsInSegment = append(clientsInSegment, clients[idx]) + clientsInSegment = append(clientsInSegment, s.clients[idx]) } segments[segment] = clientsInSegment } @@ -169,8 +155,8 @@ func (s *singleMonthActivityClients) populateSegments(predefinedSegments map[int // determine how many segments are necessary to store the clients for this month // using the default storage limits - numNecessarySegments := len(clients) / ActivitySegmentClientCapacity - if len(clients)%ActivitySegmentClientCapacity != 0 { + numNecessarySegments := len(s.clients) / ActivitySegmentClientCapacity + if len(s.clients)%ActivitySegmentClientCapacity != 0 { numNecessarySegments++ } totalSegmentCount := numNecessarySegments @@ -187,8 +173,8 @@ func (s *singleMonthActivityClients) populateSegments(predefinedSegments map[int } // determine how many clients should be in each segment - segmentSizes := len(clients) / usableSegmentCount - if len(clients)%usableSegmentCount != 0 { + segmentSizes := len(s.clients) / usableSegmentCount + if len(s.clients)%usableSegmentCount != 0 { segmentSizes++ } @@ -198,14 +184,14 @@ func (s *singleMonthActivityClients) populateSegments(predefinedSegments map[int clientIndex := 0 for i := 0; i < totalSegmentCount; i++ { - if clientIndex >= len(clients) { + if clientIndex >= len(s.clients) { break } if _, ok := ignoreIndexes[i]; ok { continue } - for len(segments[i]) < segmentSizes && clientIndex < len(clients) { - segments[i] = append(segments[i], clients[clientIndex]) + for len(segments[i]) < segmentSizes && clientIndex < len(s.clients) { + segments[i] = append(segments[i], s.clients[clientIndex]) clientIndex++ } } @@ -214,7 +200,7 @@ func (s *singleMonthActivityClients) populateSegments(predefinedSegments map[int // addNewClients generates clients according to the given parameters, and adds them to the month // the client will always have the mountAccessor as its mount accessor -func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAccessor string, segmentIndex *int, monthsAgo int32, now time.Time, core *Core) error { +func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAccessor string, segmentIndex *int, monthsAgo int32, now time.Time) error { count := 1 if c.Count > 1 { count = int(c.Count) @@ -238,8 +224,7 @@ func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAc return err } } - - s.addEntityRecord(core, record, segmentIndex) + s.addEntityRecord(record, segmentIndex) } return nil } @@ -308,7 +293,7 @@ func (m *multipleMonthsActivityClients) processMonth(ctx context.Context, core * } } - err = m.addClientToMonth(month.GetMonthsAgo(), clients, mountAccessor, segmentIndex, now, core) + err = m.addClientToMonth(month.GetMonthsAgo(), clients, mountAccessor, segmentIndex, now) if err != nil { return err } @@ -334,14 +319,14 @@ func (m *multipleMonthsActivityClients) processMonth(ctx context.Context, core * return nil } -func (m *multipleMonthsActivityClients) addClientToMonth(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int, now time.Time, core *Core) error { +func (m *multipleMonthsActivityClients) addClientToMonth(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int, now time.Time) error { if c.Repeated || c.RepeatedFromMonth > 0 { - return m.addRepeatedClients(monthsAgo, c, mountAccessor, segmentIndex, core) + return m.addRepeatedClients(monthsAgo, c, mountAccessor, segmentIndex) } - return m.months[monthsAgo].addNewClients(c, mountAccessor, segmentIndex, monthsAgo, now, core) + return m.months[monthsAgo].addNewClients(c, mountAccessor, segmentIndex, monthsAgo, now) } -func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int, core *Core) error { +func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int) error { addingTo := m.months[monthsAgo] repeatedFromMonth := monthsAgo + 1 if c.RepeatedFromMonth > 0 { @@ -354,7 +339,7 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g } for _, client := range repeatedFrom.clients { if c.ClientType == client.ClientType && mountAccessor == client.MountAccessor && c.Namespace == client.NamespaceID { - addingTo.addEntityRecord(core, client, segmentIndex) + addingTo.addEntityRecord(client, segmentIndex) numClients-- if numClients == 0 { break @@ -384,9 +369,8 @@ func (m *multipleMonthsActivityClients) timestampForMonth(i int, now time.Time) return now } -func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog, now time.Time) ([]string, []string, error) { +func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog, now time.Time) ([]string, error) { paths := []string{} - globalPaths := []string{} _, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES] _, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS] @@ -399,9 +383,9 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene continue } timestamp := m.timestampForMonth(i, now) - segments, err := month.populateSegments(month.predefinedSegments, month.clients) + segments, err := month.populateSegments() if err != nil { - return nil, nil, err + return nil, err } for segmentIndex, segment := range segments { if segment == nil { @@ -413,34 +397,12 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene currentClients: &activity.EntityActivityLog{Clients: segment}, clientSequenceNumber: uint64(segmentIndex), tokenCount: &activity.TokenCount{}, - }, true, "") + }, true) if err != nil { - return nil, nil, err + return nil, err } paths = append(paths, entityPath) } - if len(month.globalClients) > 0 { - globalSegments, err := month.populateSegments(month.predefinedGlobalSegments, month.globalClients) - if err != nil { - return nil, nil, err - } - for segmentIndex, segment := range globalSegments { - if segment == nil { - // skip the index - continue - } - entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{ - startTimestamp: timestamp.Unix(), - currentClients: &activity.EntityActivityLog{Clients: segment}, - clientSequenceNumber: uint64(segmentIndex), - tokenCount: &activity.TokenCount{}, - }, true, activityGlobalPathPrefix) - if err != nil { - return nil, nil, err - } - globalPaths = append(globalPaths, entityPath) - } - } } if writePQ || writeDistinctClients { // start with the oldest month of data, and create precomputed queries @@ -461,16 +423,16 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene if writeIntentLog { err := activityLog.writeIntentLog(ctx, m.latestTimestamp(now, false).Unix(), m.latestTimestamp(now, true).UTC()) if err != nil { - return nil, nil, err + return nil, err } } wg := sync.WaitGroup{} err := activityLog.refreshFromStoredLog(ctx, &wg, now) if err != nil { - return nil, nil, err + return nil, err } wg.Wait() - return paths, globalPaths, nil + return paths, nil } func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time, includeCurrentMonth bool) time.Time { @@ -498,8 +460,7 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit } for i := 0; i < numberOfMonths; i++ { m.months[i] = &singleMonthActivityClients{ - predefinedSegments: make(map[int][]int), - predefinedGlobalSegments: make(map[int][]int), + predefinedSegments: make(map[int][]int), } } return m @@ -523,17 +484,6 @@ type sliceSegmentReader struct { i int } -// ReadGlobalEntity here is a dummy implementation. -// Segment reader is never used when writing using the ClientCountUtil library -func (p *sliceSegmentReader) ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error) { - if p.i == len(p.records) { - return nil, io.EOF - } - record := p.records[p.i] - p.i++ - return &activity.EntityActivityLog{Clients: record}, nil -} - func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) { return nil, io.EOF } diff --git a/vault/logical_system_activity_write_testonly_test.go b/vault/logical_system_activity_write_testonly_test.go index 4df992172d..5254c0aaed 100644 --- a/vault/logical_system_activity_write_testonly_test.go +++ b/vault/logical_system_activity_write_testonly_test.go @@ -167,12 +167,10 @@ func Test_singleMonthActivityClients_addNewClients(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - core, _, _ := TestCoreUnsealed(t) m := &singleMonthActivityClients{ - predefinedSegments: make(map[int][]int), - predefinedGlobalSegments: make(map[int][]int), + predefinedSegments: make(map[int][]int), } - err := m.addNewClients(tt.clients, tt.mount, tt.segmentIndex, 0, time.Now().UTC(), core) + err := m.addNewClients(tt.clients, tt.mount, tt.segmentIndex, 0, time.Now().UTC()) require.NoError(t, err) numNew := tt.clients.Count if numNew == 0 { @@ -341,42 +339,41 @@ func Test_multipleMonthsActivityClients_processMonth_segmented(t *testing.T) { // from 1 month ago and 2 months ago, and verifies that the correct clients are // added based on namespace, mount, and non-entity attributes func Test_multipleMonthsActivityClients_addRepeatedClients(t *testing.T) { - core, _, _ := TestCoreUnsealed(t) now := time.Now().UTC() m := newMultipleMonthsActivityClients(3) defaultMount := "default" - require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2}, "identity", nil, now, core)) - require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2, Namespace: "other_ns"}, defaultMount, nil, now, core)) - require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2}, defaultMount, nil, now, core)) - require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2, ClientType: "non-entity"}, defaultMount, nil, now, core)) + require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2}, "identity", nil, now)) + require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2, Namespace: "other_ns"}, defaultMount, nil, now)) + require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2}, defaultMount, nil, now)) + require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2, ClientType: "non-entity"}, defaultMount, nil, now)) month2Clients := m.months[2].clients month1Clients := m.months[1].clients thisMonth := m.months[0] // this will match the first client in month 1 - require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true}, defaultMount, nil, core)) + require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true}, defaultMount, nil)) require.Contains(t, month1Clients, thisMonth.clients[0]) // this will match the 3rd client in month 1 - require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true, ClientType: "non-entity"}, defaultMount, nil, core)) + require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true, ClientType: "non-entity"}, defaultMount, nil)) require.Equal(t, month1Clients[2], thisMonth.clients[1]) // this will match the first two clients in month 1 - require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 2, Repeated: true}, defaultMount, nil, core)) + require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 2, Repeated: true}, defaultMount, nil)) require.Equal(t, month1Clients[0:2], thisMonth.clients[2:4]) // this will match the first client in month 2 - require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2}, "identity", nil, core)) + require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2}, "identity", nil)) require.Equal(t, month2Clients[0], thisMonth.clients[4]) // this will match the 3rd client in month 2 - require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, defaultMount, nil, core)) + require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, defaultMount, nil)) require.Equal(t, month2Clients[2], thisMonth.clients[5]) - require.Error(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, "other_mount", nil, core)) + require.Error(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, "other_mount", nil)) } // Test_singleMonthActivityClients_populateSegments calls populateSegments for a @@ -459,7 +456,7 @@ func Test_singleMonthActivityClients_populateSegments(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { s := singleMonthActivityClients{predefinedSegments: tc.segments, clients: clients, generationParameters: &generation.Data{EmptySegmentIndexes: tc.emptyIndexes, SkipSegmentIndexes: tc.skipIndexes, NumSegments: int32(tc.numSegments)}} - gotSegments, err := s.populateSegments(s.predefinedSegments, s.clients) + gotSegments, err := s.populateSegments() require.NoError(t, err) require.Equal(t, tc.wantSegments, gotSegments) })