From 3c15d4b7faf04a0b7df407fe6a1b19b70fdcb73b Mon Sep 17 00:00:00 2001 From: akshya96 <87045294+akshya96@users.noreply.github.com> Date: Thu, 2 Jan 2025 09:17:27 -0800 Subject: [PATCH] Revert "Storing local clients to local storage paths ce changes (#28958)" (#29268) This reverts commit 504227bd74b88d905898b3c3ff3ce754211eaf14. --- .../operator_usage_testonly_test.go | 2 +- sdk/helper/clientcountutil/clientcountutil.go | 27 +- .../clientcountutil/clientcountutil_test.go | 5 +- vault/activity_log.go | 262 ++++------- vault/activity_log_test.go | 411 ++---------------- vault/activity_log_testing_util.go | 64 +-- vault/activity_log_util_common.go | 19 +- vault/activity_log_util_common_test.go | 32 -- .../acme_regeneration_test.go | 6 +- .../activity_testonly_oss_test.go | 2 +- .../activity_testonly_test.go | 26 +- .../logical_system_activity_write_testonly.go | 64 +-- 12 files changed, 157 insertions(+), 763 deletions(-) diff --git a/command/command_testonly/operator_usage_testonly_test.go b/command/command_testonly/operator_usage_testonly_test.go index 74d67291fd..4cdfc0536a 100644 --- a/command/command_testonly/operator_usage_testonly_test.go +++ b/command/command_testonly/operator_usage_testonly_test.go @@ -53,7 +53,7 @@ func TestOperatorUsageCommandRun(t *testing.T) { now := time.Now().UTC() - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(6, clientcountutil.WithClientType("entity")). NewClientsSeen(4, clientcountutil.WithClientType("non-entity-token")). diff --git a/sdk/helper/clientcountutil/clientcountutil.go b/sdk/helper/clientcountutil/clientcountutil.go index d09c5be13d..16344a4d9c 100644 --- a/sdk/helper/clientcountutil/clientcountutil.go +++ b/sdk/helper/clientcountutil/clientcountutil.go @@ -282,53 +282,42 @@ func (d *ActivityLogDataGenerator) ToProto() *generation.ActivityLogMockInput { // Write writes the data to the API with the given write options. The method // returns the new paths that have been written. Note that the API endpoint will // only be present when Vault has been compiled with the "testonly" flag. -func (d *ActivityLogDataGenerator) Write(ctx context.Context, writeOptions ...generation.WriteOptions) ([]string, []string, []string, error) { +func (d *ActivityLogDataGenerator) Write(ctx context.Context, writeOptions ...generation.WriteOptions) ([]string, []string, error) { d.data.Write = writeOptions err := VerifyInput(d.data) if err != nil { - return nil, nil, nil, err + return nil, nil, err } data, err := d.ToJSON() if err != nil { - return nil, nil, nil, err + return nil, nil, err } resp, err := d.client.Logical().WriteWithContext(ctx, "sys/internal/counters/activity/write", map[string]interface{}{"input": string(data)}) if err != nil { - return nil, nil, nil, err + return nil, nil, err } if resp.Data == nil { - return nil, nil, nil, fmt.Errorf("received no data") + return nil, nil, fmt.Errorf("received no data") } paths := resp.Data["paths"] castedPaths, ok := paths.([]interface{}) if !ok { - return nil, nil, nil, fmt.Errorf("invalid paths data: %v", paths) + return nil, nil, fmt.Errorf("invalid paths data: %v", paths) } returnPaths := make([]string, 0, len(castedPaths)) for _, path := range castedPaths { returnPaths = append(returnPaths, path.(string)) } - - localPaths := resp.Data["local_paths"] - localCastedPaths, ok := localPaths.([]interface{}) - if !ok { - return nil, nil, nil, fmt.Errorf("invalid local paths data: %v", localPaths) - } - returnLocalPaths := make([]string, 0, len(localCastedPaths)) - for _, path := range localCastedPaths { - returnLocalPaths = append(returnLocalPaths, path.(string)) - } - globalPaths := resp.Data["global_paths"] globalCastedPaths, ok := globalPaths.([]interface{}) if !ok { - return nil, nil, nil, fmt.Errorf("invalid global paths data: %v", globalPaths) + return nil, nil, fmt.Errorf("invalid global paths data: %v", globalPaths) } returnGlobalPaths := make([]string, 0, len(globalCastedPaths)) for _, path := range globalCastedPaths { returnGlobalPaths = append(returnGlobalPaths, path.(string)) } - return returnPaths, returnLocalPaths, returnGlobalPaths, nil + return returnPaths, returnGlobalPaths, nil } // VerifyInput checks that the input data is valid diff --git a/sdk/helper/clientcountutil/clientcountutil_test.go b/sdk/helper/clientcountutil/clientcountutil_test.go index 4ea987fed0..fc0c695d94 100644 --- a/sdk/helper/clientcountutil/clientcountutil_test.go +++ b/sdk/helper/clientcountutil/clientcountutil_test.go @@ -116,7 +116,7 @@ func TestNewCurrentMonthData_AddClients(t *testing.T) { // sent to the server is correct. func TestWrite(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, err := io.WriteString(w, `{"data":{"paths":["path1","path2"],"global_paths":["path2","path3"], "local_paths":["path3","path4"]}}`) + _, err := io.WriteString(w, `{"data":{"paths":["path1","path2"],"global_paths":["path2","path3"]}}`) require.NoError(t, err) body, err := io.ReadAll(r.Body) require.NoError(t, err) @@ -131,7 +131,7 @@ func TestWrite(t *testing.T) { Address: ts.URL, }) require.NoError(t, err) - paths, localPaths, globalPaths, err := NewActivityLogData(client). + paths, globalPaths, err := NewActivityLogData(client). NewPreviousMonthData(3). NewClientSeen(). NewPreviousMonthData(2). @@ -142,7 +142,6 @@ func TestWrite(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{"path1", "path2"}, paths) require.Equal(t, []string{"path2", "path3"}, globalPaths) - require.Equal(t, []string{"path3", "path4"}, localPaths) } func testAddClients(t *testing.T, makeGenerator func() *ActivityLogDataGenerator, getClient func(data *ActivityLogDataGenerator) *generation.Client) { diff --git a/vault/activity_log.go b/vault/activity_log.go index 3ad43d31b4..632fe7d013 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -44,7 +44,6 @@ const ( activityConfigKey = "config" activityIntentLogKey = "endofmonth" activityGlobalPathPrefix = "global/" - activityLocalPathPrefix = "local/" activityACMERegenerationKey = "acme-regeneration" // sketch for each month that stores hash of client ids @@ -144,7 +143,7 @@ type ActivityLog struct { // ActivityLog.l protects the configuration settings, except enable, and any modifications // to the current segment. - // Acquire "l" before fragmentLock, globalFragmentLock, and localFragmentLock if all must be held. + // Acquire "l" before fragmentLock and globalFragmentLock if both must be held. l sync.RWMutex // fragmentLock protects enable, partialMonthClientTracker, fragment, @@ -207,9 +206,6 @@ type ActivityLog struct { // track metadata and contents of the most recent global log segment currentGlobalSegment segmentInfo - // track metadata and contents of the most recent local log segment - currentLocalSegment segmentInfo - // Fragments received from performance standbys standbyFragmentsReceived []*activity.LogFragment @@ -401,19 +397,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me }, clientSequenceNumber: 0, }, - currentLocalSegment: segmentInfo{ - startTimestamp: 0, - currentClients: &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - }, - // tokenCount is deprecated, but must still exist for the current segment - // so the fragment that was using TWEs before the 1.9 changes - // can be flushed to the current segment. - tokenCount: &activity.TokenCount{ - CountByNamespaceID: make(map[string]uint64), - }, - clientSequenceNumber: 0, - }, standbyFragmentsReceived: make([]*activity.LogFragment, 0), standbyLocalFragmentsReceived: make([]*activity.LogFragment, 0), standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0), @@ -526,8 +509,10 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for a.localFragmentLock.Lock() localFragment := a.localFragment a.localFragment = nil - standbyLocalFragments := a.standbyLocalFragmentsReceived - a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0) + + // standbyLocalFragments := a.standbyLocalFragmentsReceived + // a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0) + a.localFragmentLock.Unlock() // Measure the current local fragment @@ -544,11 +529,6 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for }) } - // store local fragments - if ret := a.createCurrentSegmentFromFragments(ctx, append(standbyLocalFragments, localFragment), &a.currentLocalSegment, force, activityLocalPathPrefix); ret != nil { - return ret - } - return nil } @@ -736,7 +716,7 @@ func parseSegmentNumberFromPath(path string) (int, bool) { // sorted last to first func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) { paths := make([]string, 0) - for _, basePath := range []string{activityEntityBasePath, activityLocalPathPrefix + activityEntityBasePath, activityGlobalPathPrefix + activityEntityBasePath, activityTokenLocalBasePath} { + for _, basePath := range []string{activityEntityBasePath, activityGlobalPathPrefix + activityEntityBasePath, activityTokenLocalBasePath} { p, err := a.view.List(ctx, basePath) if err != nil { return nil, err @@ -785,27 +765,10 @@ func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context, now t } // getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists -func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, uint64, uint64, bool, error) { - segmentHighestNum, segmentPresent, err := a.getLastSegmentNumberByEntityPath(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") +func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, uint64, bool, error) { + p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") if err != nil { - return 0, 0, 0, false, err - } - globalHighestNum, globalSegmentPresent, err := a.getLastSegmentNumberByEntityPath(ctx, activityGlobalPathPrefix+activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") - if err != nil { - return 0, 0, 0, false, err - } - localHighestNum, localSegmentPresent, err := a.getLastSegmentNumberByEntityPath(ctx, activityLocalPathPrefix+activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") - if err != nil { - return 0, 0, 0, false, err - } - - return segmentHighestNum, uint64(localHighestNum), uint64(globalHighestNum), (segmentPresent || localSegmentPresent || globalSegmentPresent), nil -} - -func (a *ActivityLog) getLastSegmentNumberByEntityPath(ctx context.Context, entityPath string) (uint64, bool, error) { - p, err := a.view.List(ctx, entityPath) - if err != nil { - return 0, false, err + return 0, 0, false, err } highestNum := -1 @@ -824,7 +787,27 @@ func (a *ActivityLog) getLastSegmentNumberByEntityPath(ctx context.Context, enti segmentHighestNum = 0 segmentPresent = false } - return segmentHighestNum, segmentPresent, nil + + globalPaths, err := a.view.List(ctx, activityGlobalPathPrefix+activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") + if err != nil { + return segmentHighestNum, 0, segmentPresent, err + } + + globalHighestNum := -1 + for _, path := range globalPaths { + if num, ok := parseSegmentNumberFromPath(path); ok { + if num > globalHighestNum { + globalHighestNum = num + } + } + } + + if globalHighestNum < 0 { + // numbers less than 0 are invalid. if a negative number is the highest value, there isn't a segment + return segmentHighestNum, 0, segmentPresent, nil + } + + return segmentHighestNum, uint64(globalHighestNum), segmentPresent, nil } // WalkEntitySegments loads each of the entity segments for a particular start time @@ -918,7 +901,6 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time } a.fragmentLock.Unlock() - // load all the active global clients globalPath := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) data, err = a.view.Get(ctx, globalPath) if err != nil { @@ -942,37 +924,13 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time } a.globalFragmentLock.Unlock() - // load all the active local clients - localPath := activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) - data, err = a.view.Get(ctx, localPath) - if err != nil { - return err - } - if data == nil { - return nil - } - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(data.Value, out) - if err != nil { - return err - } - a.localFragmentLock.Lock() - // Handle the (unlikely) case where the end of the month has been reached while background loading. - // Or the feature has been disabled. - if a.enabled && startTime.Unix() == a.currentLocalSegment.startTimestamp { - for _, ent := range out.Clients { - a.partialMonthLocalClientTracker[ent.ClientID] = ent - } - } - a.localFragmentLock.Unlock() - return nil } // loadCurrentClientSegment loads the most recent segment (for "this month") // into memory (to append new entries), and to the partialMonthClientTracker to -// avoid duplication call with fragmentLock, globalFragmentLock, localFragmentLock and l held. -func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64, localSegmentSequenceNumber uint64, globalSegmentSequenceNumber uint64) error { +// avoid duplication call with fragmentLock, globalFragmentLock and l held. +func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64, globalSegmentSequenceNumber uint64) error { path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) data, err := a.view.Get(ctx, path) if err != nil { @@ -1006,7 +964,6 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti a.partialMonthClientTracker[client.ClientID] = client } - // load current global segment path = activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10) data, err = a.view.Get(ctx, path) if err != nil { @@ -1041,39 +998,6 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti a.globalPartialMonthClientTracker[client.ClientID] = client } - // load current local segment - path = activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(localSegmentSequenceNumber, 10) - data, err = a.view.Get(ctx, path) - if err != nil { - return err - } - if data == nil { - return nil - } - - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(data.Value, out) - if err != nil { - return err - } - - if !a.core.perfStandby { - a.currentLocalSegment = segmentInfo{ - startTimestamp: startTime.Unix(), - currentClients: &activity.EntityActivityLog{ - Clients: out.Clients, - }, - tokenCount: a.currentLocalSegment.tokenCount, - clientSequenceNumber: sequenceNum, - } - } else { - // populate this for edge case checking (if end of month passes while background loading on standby) - a.currentLocalSegment.startTimestamp = startTime.Unix() - } - for _, client := range out.Clients { - a.partialMonthLocalClientTracker[client.ClientID] = client - } - return nil } @@ -1129,7 +1053,6 @@ func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) e // so that TWEs counted before the introduction of a client ID for TWEs are // still reported in the partial client counts. a.currentSegment.tokenCount = out - a.currentLocalSegment.tokenCount = out return nil } @@ -1153,42 +1076,36 @@ func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitG } // Initialize a new current segment, based on the current time. -// Call with fragmentLock, globalFragmentLock, localFragmentLock and l held. +// Call with fragmentLock, globalFragmentLock and l held. func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) { a.logger.Trace("initializing new log") a.resetCurrentLog() - a.setCurrentSegmentTimeLocked(now) + a.currentSegment.startTimestamp = now.Unix() + a.currentGlobalSegment.startTimestamp = now.Unix() } -// Should be called with fragmentLock, globalFragmentLock, localFragmentLock and l held. +// Should be called with fragmentLock, globalFragmentLock and l held. func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) { a.logger.Trace("continuing log to new month") a.resetCurrentLog() monthStart := timeutil.StartOfMonth(currentTime.UTC()) - a.setCurrentSegmentTimeLocked(monthStart) + a.currentSegment.startTimestamp = monthStart.Unix() + a.currentGlobalSegment.startTimestamp = monthStart.Unix() } // Initialize a new current segment, based on the given time -// should be called with fragmentLock, globalFragmentLock, localFragmentLock and l held. +// should be called with fragmentLock, globalFragmentLock and l held. func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) { timestamp := t.Unix() a.logger.Trace("starting a segment", "timestamp", timestamp) a.resetCurrentLog() - a.setCurrentSegmentTimeLocked(t) -} - -// Sets the timestamp of all the segments to the given time. -// should be called with l held. -func (a *ActivityLog) setCurrentSegmentTimeLocked(t time.Time) { - timestamp := t.Unix() a.currentSegment.startTimestamp = timestamp a.currentGlobalSegment.startTimestamp = timestamp - a.currentLocalSegment.startTimestamp = timestamp } // Reset all the current segment state. -// Should be called with fragmentLock, globalFragmentLock, localFragmentLock and l held. +// Should be called with fragmentLock, globalFragmentLock and l held. func (a *ActivityLog) resetCurrentLog() { a.currentSegment.startTimestamp = 0 a.currentSegment.currentClients = &activity.EntityActivityLog{ @@ -1202,56 +1119,62 @@ func (a *ActivityLog) resetCurrentLog() { } a.currentSegment.clientSequenceNumber = 0 + a.fragment = nil + a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) + a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) - // global segment a.currentGlobalSegment.startTimestamp = 0 a.currentGlobalSegment.currentClients = &activity.EntityActivityLog{ Clients: make([]*activity.EntityRecord, 0), } a.currentGlobalSegment.clientSequenceNumber = 0 - - // local segment - a.currentLocalSegment.startTimestamp = 0 - a.currentLocalSegment.currentClients = &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - } - a.currentLocalSegment.clientSequenceNumber = 0 - - a.fragment = nil - a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) - a.currentGlobalFragment = nil a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) - - a.localFragment = nil - a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord) - - a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) - a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0) - a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0) a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0) + a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0) } func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) { - entityPathsToDelete := make([]string, 0) - entityPathsToDelete = append(entityPathsToDelete, fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp)) - entityPathsToDelete = append(entityPathsToDelete, fmt.Sprintf("%s%v%v/", activityGlobalPathPrefix, activityEntityBasePath, startTimestamp)) - entityPathsToDelete = append(entityPathsToDelete, fmt.Sprintf("%s%v%v/", activityLocalPathPrefix, activityEntityBasePath, startTimestamp)) - entityPathsToDelete = append(entityPathsToDelete, fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp)) + entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp) + tokenPath := fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp) + globalEntityPath := fmt.Sprintf("%s%v%v/", activityGlobalPathPrefix, activityEntityBasePath, startTimestamp) - for _, path := range entityPathsToDelete { - segments, err := a.view.List(ctx, path) + entitySegments, err := a.view.List(ctx, entityPath) + if err != nil { + a.logger.Error("could not list entity paths", "error", err) + return + } + for _, p := range entitySegments { + err = a.view.Delete(ctx, entityPath+p) if err != nil { - a.logger.Error("could not list segment path", "error", err) - return - } - for _, p := range segments { - err = a.view.Delete(ctx, path+p) - if err != nil { - a.logger.Error("could not delete log", "error", err) - } + a.logger.Error("could not delete entity log", "error", err) } } + + tokenSegments, err := a.view.List(ctx, tokenPath) + if err != nil { + a.logger.Error("could not list token paths", "error", err) + return + } + for _, p := range tokenSegments { + err = a.view.Delete(ctx, tokenPath+p) + if err != nil { + a.logger.Error("could not delete token log", "error", err) + } + } + + globalEntitySegments, err := a.view.List(ctx, globalEntityPath) + if err != nil { + a.logger.Error("could not list global entity paths", "error", err) + return + } + for _, p := range globalEntitySegments { + err = a.view.Delete(ctx, globalEntityPath+p) + if err != nil { + a.logger.Error("could not delete global entity log", "error", err) + } + } + // Allow whoever started this as a goroutine to wait for it to finish. close(whenDone) } @@ -1279,9 +1202,6 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro defer a.fragmentLock.Unlock() a.globalFragmentLock.Lock() defer a.globalFragmentLock.Unlock() - // startNewCurrentLogLocked below calls resetCurrentLog which is protected by fragmentLock, globalFragmentLock, localFragmentLock and l - a.localFragmentLock.Lock() - defer a.localFragmentLock.Unlock() decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx, now) if err != nil { @@ -1350,7 +1270,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro } // load entity logs from storage into memory - lastSegment, localLastSegment, globalLastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent) + lastSegment, globalLastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent) if err != nil { return err } @@ -1359,7 +1279,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro return nil } - err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment, localLastSegment, globalLastSegment) + err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment, globalLastSegment) if err != nil || lastSegment == 0 { return err } @@ -1408,9 +1328,6 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { // enabled is protected by fragmentLock a.fragmentLock.Lock() - // startNewCurrentLogLocked and resetCurrentLog is protected by fragmentLock, globalFragmentLock, localFragmentLock and l - a.localFragmentLock.Lock() - a.globalFragmentLock.Lock() originalEnabled := a.enabled switch config.Enabled { case "enable": @@ -1425,7 +1342,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled) } - if !a.enabled && a.currentSegment.startTimestamp != 0 && a.currentGlobalSegment.startTimestamp != 0 && a.currentLocalSegment.startTimestamp != 0 { + if !a.enabled && a.currentSegment.startTimestamp != 0 && a.currentGlobalSegment.startTimestamp != 0 { a.logger.Trace("deleting current segment") a.deleteDone = make(chan struct{}) // this is called from a request under stateLock, so use activeContext @@ -1434,7 +1351,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { } forceSave := false - if a.enabled && a.currentSegment.startTimestamp == 0 && a.currentGlobalSegment.startTimestamp == 0 && a.currentLocalSegment.startTimestamp == 0 { + if a.enabled && a.currentSegment.startTimestamp == 0 && a.currentGlobalSegment.startTimestamp == 0 { a.startNewCurrentLogLocked(a.clock.Now().UTC()) // Force a save so we can distinguish between // @@ -1448,14 +1365,11 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { forceSave = true } a.fragmentLock.Unlock() - a.localFragmentLock.Unlock() - a.globalFragmentLock.Unlock() if forceSave { // l is still held here a.saveCurrentSegmentInternal(ctx, true, a.currentSegment, "") a.saveCurrentSegmentInternal(ctx, true, a.currentGlobalSegment, activityGlobalPathPrefix) - a.saveCurrentSegmentInternal(ctx, true, a.currentLocalSegment, activityLocalPathPrefix) } a.defaultReportMonths = config.DefaultReportMonths @@ -2010,12 +1924,7 @@ func (a *ActivityLog) HandleEndOfMonth(ctx context.Context, currentTime time.Tim // in the previous month, and recover by calling newMonthCurrentLog // again and triggering the precomputed query. a.fragmentLock.Lock() - // calls newMonthCurrentLogLocked which is protected by fragmentLock, globalFragmentLock, localFragmentLock and l - a.localFragmentLock.Lock() - a.globalFragmentLock.Lock() a.newMonthCurrentLogLocked(currentTime) - a.globalFragmentLock.Unlock() - a.localFragmentLock.Unlock() a.fragmentLock.Unlock() // Work on precomputed queries in background @@ -3301,9 +3210,6 @@ func (a *ActivityLog) populateNamespaceAndMonthlyBreakdowns() (map[int64]*proces for _, e := range a.partialMonthClientTracker { processClientRecord(e, byNamespace, byMonth, a.clock.Now()) } - for _, e := range a.partialMonthLocalClientTracker { - processClientRecord(e, byNamespace, byMonth, a.clock.Now()) - } return byMonth, byNamespace } diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 4742d11467..fab86b7cd1 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -626,91 +626,6 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) { expectedEntityIDs(t, out, ids) } -// TestActivityLog_SaveEntitiesToStorageCommon calls AddClientToFragment with clients with local and non-local mount accessors and then -// writes the segment to storage. Read back from storage, and verify that client IDs exist in storage in the right local and non-local entity paths. -func TestActivityLog_SaveEntitiesToStorageCommon(t *testing.T) { - t.Parallel() - - storage := &logical.InmemStorage{} - coreConfig := &CoreConfig{ - CredentialBackends: map[string]logical.Factory{ - "userpass": userpass.Factory, - }, - Physical: storage.Underlying(), - } - - cluster := NewTestCluster(t, coreConfig, nil) - core := cluster.Cores[0].Core - TestWaitActive(t, core) - - ctx := namespace.RootContext(nil) - - a := core.activityLog - a.SetEnable(true) - a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment - - var err error - - // create a local and non-local mount entry - nonLocalMountEntry := &MountEntry{ - Table: credentialTableType, - Path: "nonLocalUserpass/", - Type: "userpass", - Accessor: "nonLocalMountAccessor", - } - err = core.enableCredential(ctx, nonLocalMountEntry) - require.NoError(t, err) - - localMountEntry := &MountEntry{ - Table: credentialTableType, - Path: "localUserpass/", - Local: true, - Type: "userpass", - Accessor: "localMountAccessor", - } - err = core.enableCredential(ctx, localMountEntry) - require.NoError(t, err) - - now := time.Now() - ids := []string{"non-local-client-id-1", "non-local-client-id-2", "local-client-id-1"} - - globalPath := fmt.Sprintf("%sentity/%d/0", ActivityGlobalLogPrefix, a.GetStartTimestamp()) - localPath := fmt.Sprintf("%sentity/%d/0", ActivityLogLocalPrefix, a.GetStartTimestamp()) - - // add clients with local and non-local mount accessors - a.AddClientToFragment(ids[0], "root", now.Unix(), false, "nonLocalMountAccessor") - a.AddClientToFragment(ids[1], "root", now.Unix(), false, "nonLocalMountAccessor") - a.AddClientToFragment(ids[2], "root", now.Unix(), false, "localMountAccessor") - - err = a.saveCurrentSegmentToStorage(ctx, false) - if err != nil { - t.Fatalf("got error writing entities to storage: %v", err) - } - if a.fragment != nil { - t.Errorf("fragment was not reset after write to storage") - } - - // read entity ids from non-local entity storage path - protoSegment := readSegmentFromStorage(t, core, globalPath) - out := &activity.EntityActivityLog{} - err = proto.Unmarshal(protoSegment.Value, out) - if err != nil { - t.Fatalf("could not unmarshal protobuf: %v", err) - } - expectedEntityIDs(t, out, ids[:2]) - - // read entity ids from local entity storage path - protoSegment = readSegmentFromStorage(t, core, localPath) - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(protoSegment.Value, out) - if err != nil { - t.Fatalf("could not unmarshal protobuf: %v", err) - } - - // local entity is local-client-id-1 in ids with index 2 - expectedEntityIDs(t, out, ids[2:]) -} - // TestActivityLog_StoreAndReadHyperloglog inserts into a hyperloglog, stores it and then reads it back. The test // verifies the estimate count is correct. func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) { @@ -1307,64 +1222,54 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) { a := core.activityLog paths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/0", "entity/1111/1"} globalPaths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/1"} - localPaths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/0", "entity/1111/1"} for _, path := range paths { WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test")) } for _, path := range globalPaths { WriteToStorage(t, core, ActivityGlobalLogPrefix+path, []byte("test")) } - for _, path := range localPaths { - WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test")) - } testCases := []struct { input int64 expectedVal uint64 expectedGlobalVal uint64 - expectedLocalVal uint64 expectExists bool }{ { input: 992, expectedVal: 0, expectedGlobalVal: 0, - expectedLocalVal: 0, expectExists: true, }, { input: 1000, expectedVal: 0, expectedGlobalVal: 0, - expectedLocalVal: 0, expectExists: false, }, { input: 1001, expectedVal: 0, expectedGlobalVal: 0, - expectedLocalVal: 0, expectExists: false, }, { input: 1111, expectedVal: 1, expectedGlobalVal: 1, - expectedLocalVal: 1, expectExists: true, }, { input: 2222, expectedVal: 0, expectedGlobalVal: 0, - expectedLocalVal: 0, expectExists: false, }, } ctx := context.Background() for _, tc := range testCases { - result, localSegmentNumber, globalSegmentNumber, exists, err := a.getLastEntitySegmentNumber(ctx, time.Unix(tc.input, 0)) + result, globalSegmentNumber, exists, err := a.getLastEntitySegmentNumber(ctx, time.Unix(tc.input, 0)) if err != nil { t.Fatalf("unexpected error for input %d: %v", tc.input, err) } @@ -1377,9 +1282,7 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) { if globalSegmentNumber != tc.expectedGlobalVal { t.Errorf("expected: %d got: %d for input: %d", tc.expectedGlobalVal, globalSegmentNumber, tc.input) } - if localSegmentNumber != tc.expectedLocalVal { - t.Errorf("expected: %d got: %d for input: %d", tc.expectedLocalVal, localSegmentNumber, tc.input) - } + } } @@ -1514,24 +1417,6 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) { clientSequenceNumber: 0, } - a.currentGlobalSegment = segmentInfo{ - startTimestamp: time.Time{}.Unix(), - currentClients: &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - }, - tokenCount: a.currentGlobalSegment.tokenCount, - clientSequenceNumber: 0, - } - - a.currentLocalSegment = segmentInfo{ - startTimestamp: time.Time{}.Unix(), - currentClients: &activity.EntityActivityLog{ - Clients: make([]*activity.EntityRecord, 0), - }, - tokenCount: a.currentLocalSegment.tokenCount, - clientSequenceNumber: 0, - } - a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord) a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) @@ -1550,7 +1435,6 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { } a.l.Lock() a.currentSegment.tokenCount = tokenCount - a.currentLocalSegment.tokenCount = tokenCount a.l.Unlock() // setup in-storage data to load for testing @@ -1612,7 +1496,6 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { } WriteToStorage(t, core, ActivityLogPrefix+tc.path, data) WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data) - WriteToStorage(t, core, ActivityLogLocalPrefix+tc.path, data) } ctx := context.Background() @@ -1620,12 +1503,10 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { a.l.Lock() a.fragmentLock.Lock() a.globalFragmentLock.Lock() - a.localFragmentLock.Lock() // loadCurrentClientSegment requires us to grab the fragment lock and the // activityLog lock, as per the comment in the loadCurrentClientSegment // function - err := a.loadCurrentClientSegment(ctx, time.Unix(tc.time, 0), tc.seqNum, tc.seqNum, tc.seqNum) - a.localFragmentLock.Unlock() + err := a.loadCurrentClientSegment(ctx, time.Unix(tc.time, 0), tc.seqNum, tc.seqNum) a.globalFragmentLock.Unlock() a.fragmentLock.Unlock() a.l.Unlock() @@ -1638,26 +1519,21 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { } // verify accurate data in in-memory current segment - require.Equal(t, tc.time, a.GetStartTimestamp()) - require.Equal(t, tc.seqNum, a.GetEntitySequenceNumber()) - require.Equal(t, tc.seqNum, a.GetGlobalEntitySequenceNumber()) - require.Equal(t, tc.seqNum, a.GetLocalEntitySequenceNumber()) + startTimestamp := a.GetStartTimestamp() + if startTimestamp != tc.time { + t.Errorf("bad timestamp loaded. expected: %v, got: %v for path %q", tc.time, startTimestamp, tc.path) + } + + seqNum := a.GetEntitySequenceNumber() + if seqNum != tc.seqNum { + t.Errorf("bad sequence number loaded. expected: %v, got: %v for path %q", tc.seqNum, seqNum, tc.path) + } currentEntities := a.GetCurrentEntities() if !entityRecordsEqual(t, currentEntities.Clients, tc.entities.Clients) { t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentEntities, tc.path) } - globalClients := core.GetActiveGlobalClientsList() - if err := ActiveEntitiesEqual(globalClients, tc.entities.Clients); err != nil { - t.Errorf("bad data loaded into active global entities. expected only set of EntityID from %v in %v for path %q: %v", tc.entities.Clients, globalClients, tc.path, err) - } - - localClients := core.GetActiveLocalClientsList() - if err := ActiveEntitiesEqual(localClients, tc.entities.Clients); err != nil { - t.Errorf("bad data loaded into active local entities. expected only set of EntityID from %v in %v for path %q: %v", tc.entities.Clients, localClients, tc.path, err) - } - currentGlobalEntities := a.GetCurrentGlobalEntities() if !entityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) { t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentGlobalEntities, tc.path) @@ -1741,7 +1617,6 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { } WriteToStorage(t, core, ActivityLogPrefix+tc.path, data) WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data) - WriteToStorage(t, core, ActivityLogLocalPrefix+tc.path, data) } ctx := context.Background() @@ -1755,7 +1630,6 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) a.currentSegment.startTimestamp = tc.time a.currentGlobalSegment.startTimestamp = tc.time - a.currentLocalSegment.startTimestamp = tc.time a.fragmentLock.Unlock() a.localFragmentLock.Unlock() a.l.Unlock() @@ -1920,14 +1794,6 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities }, }...) } - - // append some local entity data - entityRecords = append(entityRecords, &activity.EntityRecord{ - ClientID: "44444444-4444-4444-4444-444444444444", - NamespaceID: namespace.RootNamespaceID, - Timestamp: time.Now().Unix(), - }) - for i, entityRecord := range entityRecords { entityData, err := proto.Marshal(&activity.EntityActivityLog{ Clients: []*activity.EntityRecord{entityRecord}, @@ -1935,15 +1801,10 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities if err != nil { t.Fatalf(err.Error()) } - switch i { - case 0: + if i == 0 { WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData) WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData) - - case len(entityRecords) - 1: - // local data - WriteToStorage(t, core, ActivityLogLocalPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) - default: + } else { WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) } @@ -1992,9 +1853,6 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) { Clients: expectedClientRecords[1:], } expectedCurrent := &activity.EntityActivityLog{ - Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1], - } - expectedCurrentLocal := &activity.EntityActivityLog{ Clients: expectedClientRecords[len(expectedClientRecords)-1:], } @@ -2004,12 +1862,6 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) { t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) } - currentLocalEntities := a.GetCurrentLocalEntities() - if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { - // we only expect the newest local entity segment to be loaded (for the current month) - t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities) - } - nsCount := a.GetStoredTokenCountByNamespaceID() if !reflect.DeepEqual(nsCount, expectedTokenCounts) { // we expect all token counts to be loaded @@ -2044,31 +1896,14 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi } wg.Wait() - // refreshFromStoredLog loads the most recent segment and then loads the older segments in the background - // most recent global and local entity from setupActivityRecordsInStorage expected := &activity.EntityActivityLog{ - Clients: expectedClientRecords[len(expectedClientRecords)-2:], - } - - // most recent global entity from setupActivityRecordsInStorage - expectedCurrent := &activity.EntityActivityLog{ - Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1], - } - // most recent local entity from setupActivityRecordsInStorage - expectedCurrentLocal := &activity.EntityActivityLog{ Clients: expectedClientRecords[len(expectedClientRecords)-1:], } - currentEntities := a.GetCurrentGlobalEntities() - if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { + currentEntities := a.GetCurrentEntities() + if !entityRecordsEqual(t, currentEntities.Clients, expected.Clients) { // we only expect the newest entity segment to be loaded (for the current month) - t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) - } - - currentLocalEntities := a.GetCurrentLocalEntities() - if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { - // we only expect the newest local entity segment to be loaded (for the current month) - t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities) + t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expected, currentEntities) } nsCount := a.GetStoredTokenCountByNamespaceID() @@ -2116,12 +1951,6 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) { Clients: expectedClientRecords[1:], } expectedCurrent := &activity.EntityActivityLog{ - Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1], - } - expectedCurrentGlobal := &activity.EntityActivityLog{ - Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1], - } - expectedCurrentLocal := &activity.EntityActivityLog{ Clients: expectedClientRecords[len(expectedClientRecords)-1:], } @@ -2130,19 +1959,6 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) { // we expect all segments for the current month to be loaded t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) } - - currentGlobalEntities := a.GetCurrentGlobalEntities() - if !entityRecordsEqual(t, currentGlobalEntities.Clients, expectedCurrentGlobal.Clients) { - // we only expect the newest entity segment to be loaded (for the current month) - t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentGlobal, currentGlobalEntities) - } - - currentLocalEntities := a.GetCurrentLocalEntities() - if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) { - // we only expect the newest local entity segment to be loaded (for the current month) - t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities) - } - activeClients := a.core.GetActiveClientsList() if err := ActiveEntitiesEqual(activeClients, expectedActive.Clients); err != nil { t.Error(err) @@ -2242,7 +2058,7 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) { Clients: expectedClientRecords[1:], } expectedCurrent := &activity.EntityActivityLog{ - Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1], + Clients: expectedClientRecords[len(expectedClientRecords)-1:], } currentEntities := a.GetCurrentEntities() @@ -2346,16 +2162,6 @@ func TestActivityLog_DeleteWorker(t *testing.T) { WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test")) } - localPaths := []string{ - "entity/1111/1", - "entity/1111/2", - "entity/1111/3", - "entity/1112/1", - } - for _, path := range localPaths { - WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test")) - } - tokenPaths := []string{ "directtokens/1111/1", "directtokens/1112/1", @@ -2377,16 +2183,12 @@ func TestActivityLog_DeleteWorker(t *testing.T) { // Check segments still present readSegmentFromStorage(t, core, ActivityLogPrefix+"entity/1112/1") - readSegmentFromStorage(t, core, ActivityLogLocalPrefix+"entity/1112/1") readSegmentFromStorage(t, core, ActivityLogLocalPrefix+"directtokens/1112/1") // Check other segments not present expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/1") expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/2") expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/3") - expectMissingSegment(t, core, ActivityLogLocalPrefix+"entity/1111/1") - expectMissingSegment(t, core, ActivityLogLocalPrefix+"entity/1111/2") - expectMissingSegment(t, core, ActivityLogLocalPrefix+"entity/1111/3") expectMissingSegment(t, core, ActivityLogLocalPrefix+"directtokens/1111/1") } @@ -2515,23 +2317,9 @@ func TestActivityLog_EndOfMonth(t *testing.T) { // We only want *fake* end of months, *real* ones are too scary. timeutil.SkipAtEndOfMonth(t) - t.Parallel() - - storage := &logical.InmemStorage{} - coreConfig := &CoreConfig{ - CredentialBackends: map[string]logical.Factory{ - "userpass": userpass.Factory, - }, - Physical: storage.Underlying(), - } - - cluster := NewTestCluster(t, coreConfig, nil) - core := cluster.Cores[0].Core - TestWaitActive(t, core) - - ctx := namespace.RootContext(nil) - + core, _, _ := TestCoreUnsealed(t) a := core.activityLog + ctx := namespace.RootContext(nil) // Make sure we're enabled. a.SetConfig(ctx, activityConfig{ @@ -2543,21 +2331,8 @@ func TestActivityLog_EndOfMonth(t *testing.T) { id1 := "11111111-1111-1111-1111-111111111111" id2 := "22222222-2222-2222-2222-222222222222" id3 := "33333333-3333-3333-3333-333333333333" - id4 := "44444444-4444-4444-4444-444444444444" a.AddEntityToFragment(id1, "root", time.Now().Unix()) - // add local data - localMountEntry := &MountEntry{ - Table: credentialTableType, - Path: "localUserpass/", - Local: true, - Type: "userpass", - Accessor: "localMountAccessor", - } - err := core.enableCredential(ctx, localMountEntry) - require.NoError(t, err) - a.AddClientToFragment(id4, "root", time.Now().Unix(), false, "localMountAccessor") - month0 := time.Now().UTC() segment0 := a.GetStartTimestamp() month1 := timeutil.StartOfNextMonth(month0) @@ -2570,12 +2345,10 @@ func TestActivityLog_EndOfMonth(t *testing.T) { path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, segment0) protoSegment := readSegmentFromStorage(t, core, path) out := &activity.EntityActivityLog{} - err = proto.Unmarshal(protoSegment.Value, out) + err := proto.Unmarshal(protoSegment.Value, out) if err != nil { t.Fatal(err) } - expectedEntityIDs(t, out, []string{id1, id4}) - path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, segment0) protoSegment = readSegmentFromStorage(t, core, path) out = &activity.EntityActivityLog{} @@ -2583,16 +2356,6 @@ func TestActivityLog_EndOfMonth(t *testing.T) { if err != nil { t.Fatal(err) } - expectedEntityIDs(t, out, []string{id1}) - - path = fmt.Sprintf("%ventity/%v/0", ActivityLogLocalPrefix, segment0) - protoSegment = readSegmentFromStorage(t, core, path) - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(protoSegment.Value, out) - if err != nil { - t.Fatal(err) - } - expectedEntityIDs(t, out, []string{id4}) segment1 := a.GetStartTimestamp() expectedTimestamp := timeutil.StartOfMonth(month1).Unix() @@ -2637,21 +2400,16 @@ func TestActivityLog_EndOfMonth(t *testing.T) { // Check all three segments still present, with correct entities testCases := []struct { - SegmentTimestamp int64 - ExpectedGlobalEntityIDs []string - ExpectedLocalEntityIDs []string + SegmentTimestamp int64 + ExpectedEntityIDs []string }{ - {segment0, []string{id1}, []string{id4}}, - {segment1, []string{id2}, []string{}}, - {segment2, []string{id3}, []string{}}, + {segment0, []string{id1}}, + {segment1, []string{id2}}, + {segment2, []string{id3}}, } for i, tc := range testCases { t.Logf("checking segment %v timestamp %v", i, tc.SegmentTimestamp) - - expectedAllEntities := make([]string, 0) - expectedAllEntities = append(expectedAllEntities, tc.ExpectedGlobalEntityIDs...) - expectedAllEntities = append(expectedAllEntities, tc.ExpectedLocalEntityIDs...) path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, tc.SegmentTimestamp) protoSegment := readSegmentFromStorage(t, core, path) out := &activity.EntityActivityLog{} @@ -2659,7 +2417,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) { if err != nil { t.Fatalf("could not unmarshal protobuf: %v", err) } - expectedEntityIDs(t, out, expectedAllEntities) + expectedEntityIDs(t, out, tc.ExpectedEntityIDs) // Check for global entities at global storage path path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, tc.SegmentTimestamp) @@ -2669,19 +2427,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) { if err != nil { t.Fatalf("could not unmarshal protobuf: %v", err) } - expectedEntityIDs(t, out, tc.ExpectedGlobalEntityIDs) - - // Check for local entities at local storage path - if len(tc.ExpectedLocalEntityIDs) > 0 { - path = fmt.Sprintf("%ventity/%v/0", ActivityLogLocalPrefix, tc.SegmentTimestamp) - protoSegment = readSegmentFromStorage(t, core, path) - out = &activity.EntityActivityLog{} - err = proto.Unmarshal(protoSegment.Value, out) - if err != nil { - t.Fatalf("could not unmarshal protobuf: %v", err) - } - expectedEntityIDs(t, out, tc.ExpectedLocalEntityIDs) - } + expectedEntityIDs(t, out, tc.ExpectedEntityIDs) } } @@ -5814,81 +5560,6 @@ func TestCreateSegment_StoreSegment(t *testing.T) { maxClientsPerFragment: ActivitySegmentClientCapacity, global: false, }, - { - testName: "[local] max client size, drop clients", - numClients: ActivitySegmentClientCapacity*2 + 1, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - }, - { - testName: "[local, no-force] max client size, drop clients", - numClients: ActivitySegmentClientCapacity*2 + 1, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - forceStore: true, - }, - { - testName: "[local] max segment size", - numClients: ActivitySegmentClientCapacity, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - }, - { - testName: "[local, no-force] max segment size", - numClients: ActivitySegmentClientCapacity, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - forceStore: true, - }, - { - testName: "[local] max segment size, multiple fragments", - numClients: ActivitySegmentClientCapacity, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: false, - }, - { - testName: "[local, no-force] max segment size, multiple fragments", - numClients: ActivitySegmentClientCapacity, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: false, - forceStore: true, - }, - { - testName: "[local] roll over", - numClients: ActivitySegmentClientCapacity + 2, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - }, - { - testName: "[local, no-force] roll over", - numClients: ActivitySegmentClientCapacity + 2, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity, - global: false, - forceStore: true, - }, - { - testName: "[local] max segment size, rollover multiple fragments", - numClients: ActivitySegmentClientCapacity * 2, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: false, - }, - { - testName: "[local, no-force] max segment size, rollover multiple fragments", - numClients: ActivitySegmentClientCapacity * 2, - pathPrefix: activityLocalPathPrefix, - maxClientsPerFragment: ActivitySegmentClientCapacity - 1, - global: false, - forceStore: true, - }, } for _, test := range testCases { @@ -5911,9 +5582,6 @@ func TestCreateSegment_StoreSegment(t *testing.T) { segment := &a.currentGlobalSegment if !test.global { segment = &a.currentSegment - if test.pathPrefix == activityLocalPathPrefix { - segment = &a.currentLocalSegment - } } // Create segments and write to storage @@ -5932,24 +5600,13 @@ func TestCreateSegment_StoreSegment(t *testing.T) { clientTotal += len(entity.GetClients()) } } else { - if test.pathPrefix == activityLocalPathPrefix { - for { - entity, err := reader.ReadLocalEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - clientTotal += len(entity.GetClients()) - } - } else { - for { - entity, err := reader.ReadEntity(ctx) - if errors.Is(err, io.EOF) { - break - } - require.NoError(t, err) - clientTotal += len(entity.GetClients()) + for { + entity, err := reader.ReadEntity(ctx) + if errors.Is(err, io.EOF) { + break } + require.NoError(t, err) + clientTotal += len(entity.GetClients()) } } diff --git a/vault/activity_log_testing_util.go b/vault/activity_log_testing_util.go index f9bb25ba14..0f02390012 100644 --- a/vault/activity_log_testing_util.go +++ b/vault/activity_log_testing_util.go @@ -93,28 +93,6 @@ func (c *Core) GetActiveClientsList() []*activity.EntityRecord { return out } -func (c *Core) GetActiveGlobalClientsList() []*activity.EntityRecord { - out := []*activity.EntityRecord{} - c.activityLog.globalFragmentLock.RLock() - // add active global clients - for _, v := range c.activityLog.globalPartialMonthClientTracker { - out = append(out, v) - } - c.activityLog.globalFragmentLock.RUnlock() - return out -} - -func (c *Core) GetActiveLocalClientsList() []*activity.EntityRecord { - out := []*activity.EntityRecord{} - c.activityLog.localFragmentLock.RLock() - // add active global clients - for _, v := range c.activityLog.partialMonthLocalClientTracker { - out = append(out, v) - } - c.activityLog.localFragmentLock.RUnlock() - return out -} - // GetCurrentEntities returns the current entity activity log func (a *ActivityLog) GetCurrentEntities() *activity.EntityActivityLog { a.l.RLock() @@ -126,16 +104,11 @@ func (a *ActivityLog) GetCurrentEntities() *activity.EntityActivityLog { func (a *ActivityLog) GetCurrentGlobalEntities() *activity.EntityActivityLog { a.l.RLock() defer a.l.RUnlock() + a.globalFragmentLock.RLock() + defer a.globalFragmentLock.RUnlock() return a.currentGlobalSegment.currentClients } -// GetCurrentLocalEntities returns the current local entity activity log -func (a *ActivityLog) GetCurrentLocalEntities() *activity.EntityActivityLog { - a.l.RLock() - defer a.l.RUnlock() - return a.currentLocalSegment.currentClients -} - // WriteToStorage is used to put entity data in storage // `path` should be the complete path (not relative to the view) func WriteToStorage(t *testing.T, c *Core, path string, data []byte) { @@ -217,9 +190,6 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart if a.partialMonthLocalClientTracker == nil { t.Errorf("expected non-nil partialMonthLocalClientTracker") } - if a.globalPartialMonthClientTracker == nil { - t.Errorf("expected non-nil globalPartialMonthClientTracker") - } if len(a.currentSegment.currentClients.Clients) > 0 { t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentClients) } @@ -232,26 +202,13 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart if len(a.partialMonthLocalClientTracker) > 0 { t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthLocalClientTracker) } - if len(a.globalPartialMonthClientTracker) > 0 { - t.Errorf("expected no active entity segment to be loaded. got: %v", a.globalPartialMonthClientTracker) - } if verifyTimeNotZero { if a.currentSegment.startTimestamp == 0 { t.Error("bad start timestamp. expected no reset but timestamp was reset") } - if a.currentGlobalSegment.startTimestamp == 0 { - t.Error("bad start timestamp. expected no reset but timestamp was reset") - } - if a.currentLocalSegment.startTimestamp == 0 { - t.Error("bad start timestamp. expected no reset but timestamp was reset") - } } else if a.currentSegment.startTimestamp != expectedStart { t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentSegment.startTimestamp) - } else if a.currentGlobalSegment.startTimestamp != expectedStart { - t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentGlobalSegment.startTimestamp) - } else if a.currentLocalSegment.startTimestamp != expectedStart { - t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentLocalSegment.startTimestamp) } } @@ -284,14 +241,13 @@ func (a *ActivityLog) SetStartTimestamp(timestamp int64) { defer a.l.Unlock() a.currentSegment.startTimestamp = timestamp a.currentGlobalSegment.startTimestamp = timestamp - a.currentLocalSegment.startTimestamp = timestamp } // GetStoredTokenCountByNamespaceID returns the count of tokens by namespace ID func (a *ActivityLog) GetStoredTokenCountByNamespaceID() map[string]uint64 { a.l.RLock() defer a.l.RUnlock() - return a.currentLocalSegment.tokenCount.CountByNamespaceID + return a.currentSegment.tokenCount.CountByNamespaceID } // GetEntitySequenceNumber returns the current entity sequence number @@ -301,20 +257,6 @@ func (a *ActivityLog) GetEntitySequenceNumber() uint64 { return a.currentSegment.clientSequenceNumber } -// GetGlobalEntitySequenceNumber returns the current entity sequence number -func (a *ActivityLog) GetGlobalEntitySequenceNumber() uint64 { - a.l.RLock() - defer a.l.RUnlock() - return a.currentGlobalSegment.clientSequenceNumber -} - -// GetLocalEntitySequenceNumber returns the current entity sequence number -func (a *ActivityLog) GetLocalEntitySequenceNumber() uint64 { - a.l.RLock() - defer a.l.RUnlock() - return a.currentLocalSegment.clientSequenceNumber -} - // SetEnable sets the enabled flag on the activity log func (a *ActivityLog) SetEnable(enabled bool) { a.l.Lock() diff --git a/vault/activity_log_util_common.go b/vault/activity_log_util_common.go index c019d03a47..67c7a847e4 100644 --- a/vault/activity_log_util_common.go +++ b/vault/activity_log_util_common.go @@ -427,7 +427,6 @@ type segmentReader struct { tokens *singleTypeSegmentReader entities *singleTypeSegmentReader globalEntities *singleTypeSegmentReader - localEntities *singleTypeSegmentReader } // SegmentReader is an interface that provides methods to read tokens and entities in order @@ -435,7 +434,6 @@ type SegmentReader interface { ReadToken(ctx context.Context) (*activity.TokenCount, error) ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error) ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error) - ReadLocalEntity(ctx context.Context) (*activity.EntityActivityLog, error) } func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.Time) (SegmentReader, error) { @@ -447,15 +445,11 @@ func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.T if err != nil { return nil, err } - localEntities, err := a.newSingleTypeSegmentReader(ctx, startTime, activityLocalPathPrefix+activityEntityBasePath) - if err != nil { - return nil, err - } tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenLocalBasePath) if err != nil { return nil, err } - return &segmentReader{entities: entities, globalEntities: globalEntities, localEntities: localEntities, tokens: tokens}, nil + return &segmentReader{entities: entities, globalEntities: globalEntities, tokens: tokens}, nil } func (a *ActivityLog) newSingleTypeSegmentReader(ctx context.Context, startTime time.Time, prefix string) (*singleTypeSegmentReader, error) { @@ -532,17 +526,6 @@ func (e *segmentReader) ReadGlobalEntity(ctx context.Context) (*activity.EntityA return out, nil } -// ReadLocalEntity reads a local entity from the local segment -// If there is none available, then the error will be io.EOF -func (e *segmentReader) ReadLocalEntity(ctx context.Context) (*activity.EntityActivityLog, error) { - out := &activity.EntityActivityLog{} - err := e.localEntities.nextValue(ctx, out) - if err != nil { - return nil, err - } - return out, nil -} - // namespaceRecordToCountsResponse converts the record to the ResponseCounts // type. The function sums entity, non-entity, and secret sync counts to get the // total client count. diff --git a/vault/activity_log_util_common_test.go b/vault/activity_log_util_common_test.go index 7201cdc651..d481560dc8 100644 --- a/vault/activity_log_util_common_test.go +++ b/vault/activity_log_util_common_test.go @@ -998,14 +998,6 @@ func writeGlobalEntitySegment(t *testing.T, core *Core, ts time.Time, index int, WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, ts, index), protoItem) } -// writeLocalEntitySegment writes a single local segment file with the given time and index for an entity -func writeLocalEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) { - t.Helper() - protoItem, err := proto.Marshal(item) - require.NoError(t, err) - WriteToStorage(t, core, makeSegmentPath(t, activityLocalPathPrefix+activityEntityBasePath, ts, index), protoItem) -} - // writeEntitySegment writes a single segment file with the given time and index for an entity func writeEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) { t.Helper() @@ -1039,7 +1031,6 @@ func TestSegmentFileReader_BadData(t *testing.T) { WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, 0), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, 0), []byte("fake data")) - WriteToStorage(t, core, makeSegmentPath(t, activityLocalPathPrefix+activityEntityBasePath, now, 0), []byte("fake data")) // write entity at index 1 entity := &activity.EntityActivityLog{Clients: []*activity.EntityRecord{ @@ -1052,9 +1043,6 @@ func TestSegmentFileReader_BadData(t *testing.T) { // write global data at index 1 writeGlobalEntitySegment(t, core, now, 1, entity) - // write local data at index 1 - writeLocalEntitySegment(t, core, now, 1, entity) - // write token at index 1 token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{ "ns": 1, @@ -1079,14 +1067,6 @@ func TestSegmentFileReader_BadData(t *testing.T) { require.True(t, proto.Equal(gotEntity, entity)) require.Nil(t, err) - // first the bad local entity is read, which returns an error - _, err = reader.ReadLocalEntity(context.Background()) - require.Error(t, err) - // then, the reader can read the good entity at index 1 - gotEntity, err = reader.ReadLocalEntity(context.Background()) - require.True(t, proto.Equal(gotEntity, entity)) - require.Nil(t, err) - // the bad token causes an error _, err = reader.ReadToken(context.Background()) require.Error(t, err) @@ -1115,13 +1095,8 @@ func TestSegmentFileReader_MissingData(t *testing.T) { }, }} writeEntitySegment(t, core, now, 3, entity) - // write global entity at index 3 writeGlobalEntitySegment(t, core, now, 3, entity) - - // write local entity at index 3 - writeLocalEntitySegment(t, core, now, 3, entity) - // write token at index 3 token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{ "ns": 1, @@ -1135,7 +1110,6 @@ func TestSegmentFileReader_MissingData(t *testing.T) { require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenLocalBasePath, now, i))) require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i))) require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, i))) - require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityLocalPathPrefix+activityEntityBasePath, now, i))) } // we expect the reader to only return the data at index 3, and then be done @@ -1156,12 +1130,6 @@ func TestSegmentFileReader_MissingData(t *testing.T) { require.True(t, proto.Equal(gotEntity, entity)) _, err = reader.ReadGlobalEntity(context.Background()) require.Equal(t, err, io.EOF) - - gotEntity, err = reader.ReadLocalEntity(context.Background()) - require.NoError(t, err) - require.True(t, proto.Equal(gotEntity, entity)) - _, err = reader.ReadLocalEntity(context.Background()) - require.Equal(t, err, io.EOF) } // TestSegmentFileReader_NoData verifies that the reader return io.EOF when there is no data diff --git a/vault/external_tests/activity_testonly/acme_regeneration_test.go b/vault/external_tests/activity_testonly/acme_regeneration_test.go index c663b174b8..5d70dc0c21 100644 --- a/vault/external_tests/activity_testonly/acme_regeneration_test.go +++ b/vault/external_tests/activity_testonly/acme_regeneration_test.go @@ -54,7 +54,7 @@ func TestACMERegeneration_RegenerateWithCurrentMonth(t *testing.T) { }) require.NoError(t, err) now := time.Now().UTC() - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(3). // 3 months ago, 15 non-entity clients and 10 ACME clients NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). @@ -116,7 +116,7 @@ func TestACMERegeneration_RegenerateMuchOlder(t *testing.T) { client := cluster.Cores[0].Client now := time.Now().UTC() - _, _, _, err := clientcountutil.NewActivityLogData(client). + _, _, err := clientcountutil.NewActivityLogData(client). NewPreviousMonthData(5). // 5 months ago, 15 non-entity clients and 10 ACME clients NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). @@ -159,7 +159,7 @@ func TestACMERegeneration_RegeneratePreviousMonths(t *testing.T) { client := cluster.Cores[0].Client now := time.Now().UTC() - _, _, _, err := clientcountutil.NewActivityLogData(client). + _, _, err := clientcountutil.NewActivityLogData(client). NewPreviousMonthData(3). // 3 months ago, 15 non-entity clients and 10 ACME clients NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). diff --git a/vault/external_tests/activity_testonly/activity_testonly_oss_test.go b/vault/external_tests/activity_testonly/activity_testonly_oss_test.go index 4b59142008..c5463bb801 100644 --- a/vault/external_tests/activity_testonly/activity_testonly_oss_test.go +++ b/vault/external_tests/activity_testonly/activity_testonly_oss_test.go @@ -29,7 +29,7 @@ func Test_ActivityLog_Disable(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(5). NewCurrentMonthData(). diff --git a/vault/external_tests/activity_testonly/activity_testonly_test.go b/vault/external_tests/activity_testonly/activity_testonly_test.go index 3e3a1259b2..481416024d 100644 --- a/vault/external_tests/activity_testonly/activity_testonly_test.go +++ b/vault/external_tests/activity_testonly/activity_testonly_test.go @@ -86,7 +86,7 @@ func Test_ActivityLog_LoseLeadership(t *testing.T) { }) require.NoError(t, err) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10). Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) @@ -121,7 +121,7 @@ func Test_ActivityLog_ClientsOverlapping(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(7). NewCurrentMonthData(). @@ -169,7 +169,7 @@ func Test_ActivityLog_ClientsNewCurrentMonth(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(5). NewCurrentMonthData(). @@ -203,7 +203,7 @@ func Test_ActivityLog_EmptyDataMonths(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10). Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) @@ -243,7 +243,7 @@ func Test_ActivityLog_FutureEndDate(t *testing.T) { "enabled": "enable", }) require.NoError(t, err) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientsSeen(10). NewCurrentMonthData(). @@ -316,7 +316,7 @@ func Test_ActivityLog_ClientTypeResponse(t *testing.T) { _, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ "enabled": "enable", }) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) @@ -369,7 +369,7 @@ func Test_ActivityLogCurrentMonth_Response(t *testing.T) { _, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ "enabled": "enable", }) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) @@ -420,7 +420,7 @@ func Test_ActivityLog_Deduplication(t *testing.T) { _, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ "enabled": "enable", }) - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(3). NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). NewPreviousMonthData(2). @@ -462,7 +462,7 @@ func Test_ActivityLog_MountDeduplication(t *testing.T) { require.NoError(t, err) now := time.Now().UTC() - _, localPaths, globalPaths, err := clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewPreviousMonthData(1). NewClientSeen(clientcountutil.WithClientMount("sys")). NewClientSeen(clientcountutil.WithClientMount("secret")). @@ -473,10 +473,6 @@ func Test_ActivityLog_MountDeduplication(t *testing.T) { NewClientSeen(clientcountutil.WithClientMount("sys")). Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) require.NoError(t, err) - // cubbyhole is local, 2 segments must exist for 2 months seen - require.Len(t, localPaths, 2) - // 2 global segments must exist for 2 months seen - require.Len(t, globalPaths, 2) resp, err := client.Logical().ReadWithData("sys/internal/counters/activity", map[string][]string{ "end_time": {timeutil.EndOfMonth(now).Format(time.RFC3339)}, @@ -673,7 +669,7 @@ func Test_ActivityLog_Export_Sudo(t *testing.T) { rootToken := client.Token() - _, _, _, err = clientcountutil.NewActivityLogData(client). + _, _, err = clientcountutil.NewActivityLogData(client). NewCurrentMonthData(). NewClientsSeen(10). Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) @@ -849,7 +845,7 @@ func TestHandleQuery_MultipleMounts(t *testing.T) { } // Write all the client count data - _, _, _, err = activityLogGenerator.Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) + _, _, err = activityLogGenerator.Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) require.NoError(t, err) endOfCurrentMonth := timeutil.EndOfMonth(time.Now().UTC()) diff --git a/vault/logical_system_activity_write_testonly.go b/vault/logical_system_activity_write_testonly.go index 3f6f4caa56..b5f80e380e 100644 --- a/vault/logical_system_activity_write_testonly.go +++ b/vault/logical_system_activity_write_testonly.go @@ -85,7 +85,7 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo for _, opt := range input.Write { opts[opt] = struct{}{} } - paths, localPaths, globalPaths, err := generated.write(ctx, opts, b.Core.activityLog, now) + paths, globalPaths, err := generated.write(ctx, opts, b.Core.activityLog, now) if err != nil { b.logger.Debug("failed to write activity log data", "error", err.Error()) return logical.ErrorResponse("failed to write data"), err @@ -93,7 +93,6 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo return &logical.Response{ Data: map[string]interface{}{ "paths": paths, - "local_paths": localPaths, "global_paths": globalPaths, }, }, nil @@ -105,17 +104,12 @@ type singleMonthActivityClients struct { clients []*activity.EntityRecord // globalClients are indexed by ID globalClients []*activity.EntityRecord - // localClients are indexed by ID - localClients []*activity.EntityRecord // predefinedSegments map from the segment number to the client's index in // the clients slice predefinedSegments map[int][]int // predefinedGlobalSegments map from the segment number to the client's index in // the clients slice predefinedGlobalSegments map[int][]int - // predefinedLocalSegments map from the segment number to the client's index in - // the clients slice - predefinedLocalSegments map[int][]int // generationParameters holds the generation request generationParameters *generation.Data } @@ -131,8 +125,6 @@ func (s *singleMonthActivityClients) addEntityRecord(core *Core, record *activit local, _ := core.activityLog.isClientLocal(record) if !local { s.globalClients = append(s.globalClients, record) - } else { - s.localClients = append(s.localClients, record) } if segmentIndex != nil { index := len(s.clients) - 1 @@ -140,9 +132,6 @@ func (s *singleMonthActivityClients) addEntityRecord(core *Core, record *activit if !local { globalIndex := len(s.globalClients) - 1 s.predefinedGlobalSegments[*segmentIndex] = append(s.predefinedGlobalSegments[*segmentIndex], globalIndex) - } else { - localIndex := len(s.localClients) - 1 - s.predefinedLocalSegments[*segmentIndex] = append(s.predefinedLocalSegments[*segmentIndex], localIndex) } } } @@ -395,10 +384,9 @@ func (m *multipleMonthsActivityClients) timestampForMonth(i int, now time.Time) return now } -func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog, now time.Time) ([]string, []string, []string, error) { +func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog, now time.Time) ([]string, []string, error) { paths := []string{} globalPaths := []string{} - localPaths := []string{} _, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES] _, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS] @@ -413,7 +401,7 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene timestamp := m.timestampForMonth(i, now) segments, err := month.populateSegments(month.predefinedSegments, month.clients) if err != nil { - return nil, nil, nil, err + return nil, nil, err } for segmentIndex, segment := range segments { if segment == nil { @@ -427,14 +415,14 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene tokenCount: &activity.TokenCount{}, }, true, "") if err != nil { - return nil, nil, nil, err + return nil, nil, err } paths = append(paths, entityPath) } if len(month.globalClients) > 0 { globalSegments, err := month.populateSegments(month.predefinedGlobalSegments, month.globalClients) if err != nil { - return nil, nil, nil, err + return nil, nil, err } for segmentIndex, segment := range globalSegments { if segment == nil { @@ -448,33 +436,11 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene tokenCount: &activity.TokenCount{}, }, true, activityGlobalPathPrefix) if err != nil { - return nil, nil, nil, err + return nil, nil, err } globalPaths = append(globalPaths, entityPath) } } - if len(month.localClients) > 0 { - localSegments, err := month.populateSegments(month.predefinedLocalSegments, month.localClients) - if err != nil { - return nil, nil, nil, err - } - for segmentIndex, segment := range localSegments { - if segment == nil { - // skip the index - continue - } - entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{ - startTimestamp: timestamp.Unix(), - currentClients: &activity.EntityActivityLog{Clients: segment}, - clientSequenceNumber: uint64(segmentIndex), - tokenCount: &activity.TokenCount{}, - }, true, activityLocalPathPrefix) - if err != nil { - return nil, nil, nil, err - } - localPaths = append(localPaths, entityPath) - } - } } if writePQ || writeDistinctClients { // start with the oldest month of data, and create precomputed queries @@ -495,16 +461,16 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene if writeIntentLog { err := activityLog.writeIntentLog(ctx, m.latestTimestamp(now, false).Unix(), m.latestTimestamp(now, true).UTC()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } } wg := sync.WaitGroup{} err := activityLog.refreshFromStoredLog(ctx, &wg, now) if err != nil { - return nil, nil, nil, err + return nil, nil, err } wg.Wait() - return paths, localPaths, globalPaths, nil + return paths, globalPaths, nil } func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time, includeCurrentMonth bool) time.Time { @@ -534,7 +500,6 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit m.months[i] = &singleMonthActivityClients{ predefinedSegments: make(map[int][]int), predefinedGlobalSegments: make(map[int][]int), - predefinedLocalSegments: make(map[int][]int), } } return m @@ -569,17 +534,6 @@ func (p *sliceSegmentReader) ReadGlobalEntity(ctx context.Context) (*activity.En return &activity.EntityActivityLog{Clients: record}, nil } -// ReadLocalEntity here is a dummy implementation. -// Segment reader is never used when writing using the ClientCountUtil library -func (p *sliceSegmentReader) ReadLocalEntity(ctx context.Context) (*activity.EntityActivityLog, error) { - if p.i == len(p.records) { - return nil, io.EOF - } - record := p.records[p.i] - p.i++ - return &activity.EntityActivityLog{Clients: record}, nil -} - func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) { return nil, io.EOF }