mirror of
				https://github.com/optim-enterprises-bv/vault.git
				synced 2025-10-31 02:28:09 +00:00 
			
		
		
		
	Store global clients at separate storage paths (#28926)
This commit is contained in:
		| @@ -53,7 +53,7 @@ func TestOperatorUsageCommandRun(t *testing.T) { | ||||
|  | ||||
| 	now := time.Now().UTC() | ||||
|  | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(1). | ||||
| 		NewClientsSeen(6, clientcountutil.WithClientType("entity")). | ||||
| 		NewClientsSeen(4, clientcountutil.WithClientType("non-entity-token")). | ||||
|   | ||||
| @@ -282,33 +282,42 @@ func (d *ActivityLogDataGenerator) ToProto() *generation.ActivityLogMockInput { | ||||
| // Write writes the data to the API with the given write options. The method | ||||
| // returns the new paths that have been written. Note that the API endpoint will | ||||
| // only be present when Vault has been compiled with the "testonly" flag. | ||||
| func (d *ActivityLogDataGenerator) Write(ctx context.Context, writeOptions ...generation.WriteOptions) ([]string, error) { | ||||
| func (d *ActivityLogDataGenerator) Write(ctx context.Context, writeOptions ...generation.WriteOptions) ([]string, []string, error) { | ||||
| 	d.data.Write = writeOptions | ||||
| 	err := VerifyInput(d.data) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 	data, err := d.ToJSON() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 	resp, err := d.client.Logical().WriteWithContext(ctx, "sys/internal/counters/activity/write", map[string]interface{}{"input": string(data)}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 	if resp.Data == nil { | ||||
| 		return nil, fmt.Errorf("received no data") | ||||
| 		return nil, nil, fmt.Errorf("received no data") | ||||
| 	} | ||||
| 	paths := resp.Data["paths"] | ||||
| 	castedPaths, ok := paths.([]interface{}) | ||||
| 	if !ok { | ||||
| 		return nil, fmt.Errorf("invalid paths data: %v", paths) | ||||
| 		return nil, nil, fmt.Errorf("invalid paths data: %v", paths) | ||||
| 	} | ||||
| 	returnPaths := make([]string, 0, len(castedPaths)) | ||||
| 	for _, path := range castedPaths { | ||||
| 		returnPaths = append(returnPaths, path.(string)) | ||||
| 	} | ||||
| 	return returnPaths, nil | ||||
| 	globalPaths := resp.Data["global_paths"] | ||||
| 	globalCastedPaths, ok := globalPaths.([]interface{}) | ||||
| 	if !ok { | ||||
| 		return nil, nil, fmt.Errorf("invalid global paths data: %v", globalPaths) | ||||
| 	} | ||||
| 	returnGlobalPaths := make([]string, 0, len(globalCastedPaths)) | ||||
| 	for _, path := range globalCastedPaths { | ||||
| 		returnGlobalPaths = append(returnGlobalPaths, path.(string)) | ||||
| 	} | ||||
| 	return returnPaths, returnGlobalPaths, nil | ||||
| } | ||||
|  | ||||
| // VerifyInput checks that the input data is valid | ||||
|   | ||||
| @@ -116,7 +116,7 @@ func TestNewCurrentMonthData_AddClients(t *testing.T) { | ||||
| // sent to the server is correct. | ||||
| func TestWrite(t *testing.T) { | ||||
| 	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		_, err := io.WriteString(w, `{"data":{"paths":["path1","path2"]}}`) | ||||
| 		_, err := io.WriteString(w, `{"data":{"paths":["path1","path2"],"global_paths":["path2","path3"]}}`) | ||||
| 		require.NoError(t, err) | ||||
| 		body, err := io.ReadAll(r.Body) | ||||
| 		require.NoError(t, err) | ||||
| @@ -131,7 +131,7 @@ func TestWrite(t *testing.T) { | ||||
| 		Address: ts.URL, | ||||
| 	}) | ||||
| 	require.NoError(t, err) | ||||
| 	paths, err := NewActivityLogData(client). | ||||
| 	paths, globalPaths, err := NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(3). | ||||
| 		NewClientSeen(). | ||||
| 		NewPreviousMonthData(2). | ||||
| @@ -141,6 +141,7 @@ func TestWrite(t *testing.T) { | ||||
|  | ||||
| 	require.NoError(t, err) | ||||
| 	require.Equal(t, []string{"path1", "path2"}, paths) | ||||
| 	require.Equal(t, []string{"path2", "path3"}, globalPaths) | ||||
| } | ||||
|  | ||||
| func testAddClients(t *testing.T, makeGenerator func() *ActivityLogDataGenerator, getClient func(data *ActivityLogDataGenerator) *generation.Client) { | ||||
|   | ||||
| @@ -43,15 +43,17 @@ const ( | ||||
| 	activityQueryBasePath      = "queries/" | ||||
| 	activityConfigKey          = "config" | ||||
| 	activityIntentLogKey       = "endofmonth" | ||||
| 	activityGlobalPathPrefix   = "global/" | ||||
|  | ||||
| 	activityACMERegenerationKey = "acme-regeneration" | ||||
| 	// sketch for each month that stores hash of client ids | ||||
| 	distinctClientsBasePath = "log/distinctclients/" | ||||
|  | ||||
| 	// for testing purposes (public as needed) | ||||
| 	ActivityLogPrefix      = "sys/counters/activity/log/" | ||||
| 	ActivityLogLocalPrefix = "sys/counters/activity/local/log/" | ||||
| 	ActivityPrefix         = "sys/counters/activity/" | ||||
| 	ActivityLogPrefix       = "sys/counters/activity/log/" | ||||
| 	ActivityGlobalLogPrefix = "sys/counters/activity/global/log/" | ||||
| 	ActivityLogLocalPrefix  = "sys/counters/activity/local/log/" | ||||
| 	ActivityPrefix          = "sys/counters/activity/" | ||||
|  | ||||
| 	// Time to wait before a perf standby sends data to the active node, or | ||||
| 	// before the active node of a performance secondary sends global data to the primary. | ||||
| @@ -201,6 +203,9 @@ type ActivityLog struct { | ||||
| 	// track metadata and contents of the most recent log segment | ||||
| 	currentSegment segmentInfo | ||||
|  | ||||
| 	// track metadata and contents of the most recent global log segment | ||||
| 	currentGlobalSegment segmentInfo | ||||
|  | ||||
| 	// Fragments received from performance standbys | ||||
| 	standbyFragmentsReceived []*activity.LogFragment | ||||
|  | ||||
| @@ -278,6 +283,10 @@ type ActivityLogCoreConfig struct { | ||||
| 	// PerfStandbyFragmentSendInterval sets the interval to send fragment data from the perf standby to the active | ||||
| 	// This is only for testing purposes | ||||
| 	PerfStandbyFragmentSendInterval time.Duration | ||||
|  | ||||
| 	// StorageWriteTestingInterval sets the interval flush data to the storage. | ||||
| 	// This is only for testing purposes | ||||
| 	StorageWriteTestingInterval time.Duration | ||||
| } | ||||
|  | ||||
| // ActivityLogExportRecord is the output structure for activity export | ||||
| @@ -375,6 +384,19 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me | ||||
| 			}, | ||||
| 			clientSequenceNumber: 0, | ||||
| 		}, | ||||
| 		currentGlobalSegment: segmentInfo{ | ||||
| 			startTimestamp: 0, | ||||
| 			currentClients: &activity.EntityActivityLog{ | ||||
| 				Clients: make([]*activity.EntityRecord, 0), | ||||
| 			}, | ||||
| 			// tokenCount is deprecated, but must still exist for the current segment | ||||
| 			// so the fragment that was using TWEs before the 1.9 changes | ||||
| 			// can be flushed to the current segment. | ||||
| 			tokenCount: &activity.TokenCount{ | ||||
| 				CountByNamespaceID: make(map[string]uint64), | ||||
| 			}, | ||||
| 			clientSequenceNumber: 0, | ||||
| 		}, | ||||
| 		standbyFragmentsReceived:       make([]*activity.LogFragment, 0), | ||||
| 		standbyLocalFragmentsReceived:  make([]*activity.LogFragment, 0), | ||||
| 		standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0), | ||||
| @@ -434,6 +456,8 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 	a.globalFragmentLock.Lock() | ||||
| 	secondaryGlobalClients := a.secondaryGlobalClientFragments | ||||
| 	a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0) | ||||
| 	standbyGlobalClients := a.standbyGlobalFragmentsReceived | ||||
| 	a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0) | ||||
| 	globalClients := a.currentGlobalFragment | ||||
| 	a.currentGlobalFragment = nil | ||||
| 	a.globalFragmentLock.Unlock() | ||||
| @@ -450,6 +474,9 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 		for _, globalReceivedFragment := range secondaryGlobalClients { | ||||
| 			globalReceivedFragmentTotal += len(globalReceivedFragment.Clients) | ||||
| 		} | ||||
| 		for _, globalReceivedFragment := range standbyGlobalClients { | ||||
| 			globalReceivedFragmentTotal += len(globalReceivedFragment.Clients) | ||||
| 		} | ||||
| 		a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_received_fragment_size"}, | ||||
| 			float32(globalReceivedFragmentTotal), | ||||
| 			[]metricsutil.Label{ | ||||
| @@ -465,18 +492,17 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// Measure the current regular fragment | ||||
| 	if currentFragment != nil { | ||||
| 		a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"}, | ||||
| 			float32(len(currentFragment.Clients)), | ||||
| 			[]metricsutil.Label{ | ||||
| 				{"type", "entity"}, | ||||
| 			}) | ||||
| 		a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"}, | ||||
| 			float32(len(currentFragment.NonEntityTokens)), | ||||
| 			[]metricsutil.Label{ | ||||
| 				{"type", "direct_token"}, | ||||
| 			}) | ||||
| 	if ret := a.createCurrentSegmentFromFragments(ctx, append(standbys, currentFragment), &a.currentSegment, force, ""); ret != nil { | ||||
| 		return ret | ||||
| 	} | ||||
|  | ||||
| 	// If we are the primary, store global clients | ||||
| 	// Create fragments from global clients and store the segment | ||||
| 	if !a.core.IsPerfSecondary() { | ||||
| 		globalFragments := append(append(secondaryGlobalClients, globalClients), standbyGlobalClients...) | ||||
| 		if ret := a.createCurrentSegmentFromFragments(ctx, globalFragments, &a.currentGlobalSegment, force, activityGlobalPathPrefix); ret != nil { | ||||
| 			return ret | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Swap out the pending local fragments | ||||
| @@ -503,10 +529,14 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 			}) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (a *ActivityLog) createCurrentSegmentFromFragments(ctx context.Context, fragments []*activity.LogFragment, currentSegment *segmentInfo, force bool, storagePathPrefix string) error { | ||||
| 	// Collect new entities and new tokens. | ||||
| 	saveChanges := false | ||||
| 	newEntities := make(map[string]*activity.EntityRecord) | ||||
| 	for _, f := range append(standbys, currentFragment) { | ||||
| 	for _, f := range fragments { | ||||
| 		if f == nil { | ||||
| 			continue | ||||
| 		} | ||||
| @@ -530,7 +560,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 			// a.partialMonthClientTracker.nonEntityCountByNamespaceID. This preserves backward | ||||
| 			// compatibility for the precomputedQueryWorkers and the segment storing | ||||
| 			// logic. | ||||
| 			a.currentSegment.tokenCount.CountByNamespaceID[ns] += val | ||||
| 			currentSegment.tokenCount.CountByNamespaceID[ns] += val | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -539,14 +569,14 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 	} | ||||
|  | ||||
| 	// Will all new entities fit?  If not, roll over to a new segment. | ||||
| 	available := ActivitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients) | ||||
| 	available := ActivitySegmentClientCapacity - len(currentSegment.currentClients.Clients) | ||||
| 	remaining := available - len(newEntities) | ||||
| 	excess := 0 | ||||
| 	if remaining < 0 { | ||||
| 		excess = -remaining | ||||
| 	} | ||||
|  | ||||
| 	segmentClients := a.currentSegment.currentClients.Clients | ||||
| 	segmentClients := currentSegment.currentClients.Clients | ||||
| 	excessClients := make([]*activity.EntityRecord, 0, excess) | ||||
| 	for _, record := range newEntities { | ||||
| 		if available > 0 { | ||||
| @@ -556,8 +586,8 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 			excessClients = append(excessClients, record) | ||||
| 		} | ||||
| 	} | ||||
| 	a.currentSegment.currentClients.Clients = segmentClients | ||||
| 	err := a.saveCurrentSegmentInternal(ctx, force) | ||||
| 	currentSegment.currentClients.Clients = segmentClients | ||||
| 	err := a.saveCurrentSegmentInternal(ctx, force, *currentSegment, storagePathPrefix) | ||||
| 	if err != nil { | ||||
| 		// The current fragment(s) have already been placed into the in-memory | ||||
| 		// segment, but we may lose any excess (in excessClients). | ||||
| @@ -567,7 +597,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 	} | ||||
|  | ||||
| 	if available <= 0 { | ||||
| 		if a.currentSegment.clientSequenceNumber >= activityLogMaxSegmentPerMonth { | ||||
| 		if currentSegment.clientSequenceNumber >= activityLogMaxSegmentPerMonth { | ||||
| 			// Cannot send as Warn because it will repeat too often, | ||||
| 			// and disabling/renabling would be complicated. | ||||
| 			a.logger.Trace("too many segments in current month", "dropped", len(excessClients)) | ||||
| @@ -575,13 +605,13 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| 		} | ||||
|  | ||||
| 		// Rotate to next segment | ||||
| 		a.currentSegment.clientSequenceNumber += 1 | ||||
| 		currentSegment.clientSequenceNumber += 1 | ||||
| 		if len(excessClients) > ActivitySegmentClientCapacity { | ||||
| 			a.logger.Warn("too many new active clients, dropping tail", "clients", len(excessClients)) | ||||
| 			excessClients = excessClients[:ActivitySegmentClientCapacity] | ||||
| 		} | ||||
| 		a.currentSegment.currentClients.Clients = excessClients | ||||
| 		err := a.saveCurrentSegmentInternal(ctx, force) | ||||
| 		currentSegment.currentClients.Clients = excessClients | ||||
| 		err := a.saveCurrentSegmentInternal(ctx, force, *currentSegment, storagePathPrefix) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| @@ -590,12 +620,12 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for | ||||
| } | ||||
|  | ||||
| // :force: forces a save of tokens/entities even if the in-memory log is empty | ||||
| func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error { | ||||
| 	_, err := a.saveSegmentEntitiesInternal(ctx, a.currentSegment, force) | ||||
| func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool, currentSegment segmentInfo, storagePathPrefix string) error { | ||||
| 	_, err := a.saveSegmentEntitiesInternal(ctx, currentSegment, force, storagePathPrefix) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	_, err = a.saveSegmentTokensInternal(ctx, a.currentSegment, force) | ||||
| 	_, err = a.saveSegmentTokensInternal(ctx, currentSegment, force) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| @@ -614,15 +644,15 @@ func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegm | ||||
| 	switch { | ||||
| 	case err != nil: | ||||
| 		a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error())) | ||||
| 	case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 && | ||||
| 	case len(currentSegment.tokenCount.CountByNamespaceID) > 0 && | ||||
| 		(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(time.Now())): | ||||
| 		a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion)) | ||||
| 	default: | ||||
| 		if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 { | ||||
| 		if len(currentSegment.tokenCount.CountByNamespaceID) > 0 { | ||||
| 			a.logger.Info("storing nonzero token count") | ||||
| 		} | ||||
| 	} | ||||
| 	tokenCount, err := proto.Marshal(a.currentSegment.tokenCount) | ||||
| 	tokenCount, err := proto.Marshal(currentSegment.tokenCount) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| @@ -639,10 +669,10 @@ func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegm | ||||
| 	return tokenPath, nil | ||||
| } | ||||
|  | ||||
| func (a *ActivityLog) saveSegmentEntitiesInternal(ctx context.Context, currentSegment segmentInfo, force bool) (string, error) { | ||||
| 	entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber) | ||||
| func (a *ActivityLog) saveSegmentEntitiesInternal(ctx context.Context, currentSegment segmentInfo, force bool, storagePathPrefix string) (string, error) { | ||||
| 	entityPath := fmt.Sprintf("%s%s%d/%d", storagePathPrefix, activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber) | ||||
|  | ||||
| 	for _, client := range a.currentSegment.currentClients.Clients { | ||||
| 	for _, client := range currentSegment.currentClients.Clients { | ||||
| 		// Explicitly catch and throw clear error message if client ID creation and storage | ||||
| 		// results in a []byte that doesn't assert into a valid string. | ||||
| 		if !utf8.ValidString(client.ClientID) { | ||||
| @@ -686,7 +716,7 @@ func parseSegmentNumberFromPath(path string) (int, bool) { | ||||
| // sorted last to first | ||||
| func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) { | ||||
| 	paths := make([]string, 0) | ||||
| 	for _, basePath := range []string{activityEntityBasePath, activityTokenLocalBasePath} { | ||||
| 	for _, basePath := range []string{activityEntityBasePath, activityGlobalPathPrefix + activityEntityBasePath, activityTokenLocalBasePath} { | ||||
| 		p, err := a.view.List(ctx, basePath) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| @@ -735,10 +765,10 @@ func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context, now t | ||||
| } | ||||
|  | ||||
| // getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists | ||||
| func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, bool, error) { | ||||
| func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, uint64, bool, error) { | ||||
| 	p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") | ||||
| 	if err != nil { | ||||
| 		return 0, false, err | ||||
| 		return 0, 0, false, err | ||||
| 	} | ||||
|  | ||||
| 	highestNum := -1 | ||||
| @@ -750,12 +780,34 @@ func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	segmentPresent := true | ||||
| 	segmentHighestNum := uint64(highestNum) | ||||
| 	if highestNum < 0 { | ||||
| 		// numbers less than 0 are invalid. if a negative number is the highest value, there isn't a segment | ||||
| 		return 0, false, nil | ||||
| 		segmentHighestNum = 0 | ||||
| 		segmentPresent = false | ||||
| 	} | ||||
|  | ||||
| 	return uint64(highestNum), true, nil | ||||
| 	globalPaths, err := a.view.List(ctx, activityGlobalPathPrefix+activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/") | ||||
| 	if err != nil { | ||||
| 		return segmentHighestNum, 0, segmentPresent, err | ||||
| 	} | ||||
|  | ||||
| 	globalHighestNum := -1 | ||||
| 	for _, path := range globalPaths { | ||||
| 		if num, ok := parseSegmentNumberFromPath(path); ok { | ||||
| 			if num > globalHighestNum { | ||||
| 				globalHighestNum = num | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if globalHighestNum < 0 { | ||||
| 		// numbers less than 0 are invalid. if a negative number is the highest value, there isn't a segment | ||||
| 		return segmentHighestNum, 0, segmentPresent, nil | ||||
| 	} | ||||
|  | ||||
| 	return segmentHighestNum, uint64(globalHighestNum), segmentPresent, nil | ||||
| } | ||||
|  | ||||
| // WalkEntitySegments loads each of the entity segments for a particular start time | ||||
| @@ -838,29 +890,47 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time | ||||
| 	} | ||||
|  | ||||
| 	a.l.RLock() | ||||
| 	defer a.l.RUnlock() | ||||
| 	a.fragmentLock.Lock() | ||||
| 	a.globalFragmentLock.Lock() | ||||
| 	// Handle the (unlikely) case where the end of the month has been reached while background loading. | ||||
| 	// Or the feature has been disabled. | ||||
| 	if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp { | ||||
| 		for _, ent := range out.Clients { | ||||
| 			a.partialMonthClientTracker[ent.ClientID] = ent | ||||
| 			if local, _ := a.isClientLocal(ent); !local { | ||||
| 				a.globalPartialMonthClientTracker[ent.ClientID] = ent | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	a.fragmentLock.Unlock() | ||||
|  | ||||
| 	globalPath := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) | ||||
| 	data, err = a.view.Get(ctx, globalPath) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if data == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	out = &activity.EntityActivityLog{} | ||||
| 	err = proto.Unmarshal(data.Value, out) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	a.globalFragmentLock.Lock() | ||||
| 	// Handle the (unlikely) case where the end of the month has been reached while background loading. | ||||
| 	// Or the feature has been disabled. | ||||
| 	if a.enabled && startTime.Unix() == a.currentGlobalSegment.startTimestamp { | ||||
| 		for _, ent := range out.Clients { | ||||
| 			a.globalPartialMonthClientTracker[ent.ClientID] = ent | ||||
| 		} | ||||
| 	} | ||||
| 	a.globalFragmentLock.Unlock() | ||||
| 	a.l.RUnlock() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // loadCurrentClientSegment loads the most recent segment (for "this month") | ||||
| // into memory (to append new entries), and to the partialMonthClientTracker to | ||||
| // avoid duplication call with fragmentLock and l held. | ||||
| func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64) error { | ||||
| // avoid duplication call with fragmentLock, globalFragmentLock and l held. | ||||
| func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64, globalSegmentSequenceNumber uint64) error { | ||||
| 	path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10) | ||||
| 	data, err := a.view.Get(ctx, path) | ||||
| 	if err != nil { | ||||
| @@ -892,9 +962,40 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti | ||||
|  | ||||
| 	for _, client := range out.Clients { | ||||
| 		a.partialMonthClientTracker[client.ClientID] = client | ||||
| 		if local, _ := a.isClientLocal(client); !local { | ||||
| 			a.globalPartialMonthClientTracker[client.ClientID] = client | ||||
| 	} | ||||
|  | ||||
| 	path = activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10) | ||||
| 	data, err = a.view.Get(ctx, path) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if data == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	out = &activity.EntityActivityLog{} | ||||
| 	err = proto.Unmarshal(data.Value, out) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if !a.core.perfStandby { | ||||
| 		a.currentGlobalSegment = segmentInfo{ | ||||
| 			startTimestamp: startTime.Unix(), | ||||
| 			currentClients: &activity.EntityActivityLog{ | ||||
| 				Clients: out.Clients, | ||||
| 			}, | ||||
| 			tokenCount: &activity.TokenCount{ | ||||
| 				CountByNamespaceID: make(map[string]uint64), | ||||
| 			}, | ||||
| 			clientSequenceNumber: sequenceNum, | ||||
| 		} | ||||
| 	} else { | ||||
| 		// populate this for edge case checking (if end of month passes while background loading on standby) | ||||
| 		a.currentGlobalSegment.startTimestamp = startTime.Unix() | ||||
| 	} | ||||
| 	for _, client := range out.Clients { | ||||
| 		a.globalPartialMonthClientTracker[client.ClientID] = client | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| @@ -975,33 +1076,36 @@ func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitG | ||||
| } | ||||
|  | ||||
| // Initialize a new current segment, based on the current time. | ||||
| // Call with fragmentLock and l held. | ||||
| // Call with fragmentLock, globalFragmentLock and l held. | ||||
| func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) { | ||||
| 	a.logger.Trace("initializing new log") | ||||
| 	a.resetCurrentLog() | ||||
| 	a.currentSegment.startTimestamp = now.Unix() | ||||
| 	a.currentGlobalSegment.startTimestamp = now.Unix() | ||||
| } | ||||
|  | ||||
| // Should be called with fragmentLock and l held. | ||||
| // Should be called with fragmentLock, globalFragmentLock and l held. | ||||
| func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) { | ||||
| 	a.logger.Trace("continuing log to new month") | ||||
| 	a.resetCurrentLog() | ||||
| 	monthStart := timeutil.StartOfMonth(currentTime.UTC()) | ||||
| 	a.currentSegment.startTimestamp = monthStart.Unix() | ||||
| 	a.currentGlobalSegment.startTimestamp = monthStart.Unix() | ||||
| } | ||||
|  | ||||
| // Initialize a new current segment, based on the given time | ||||
| // should be called with fragmentLock and l held. | ||||
| // should be called with fragmentLock, globalFragmentLock and l held. | ||||
| func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) { | ||||
| 	timestamp := t.Unix() | ||||
|  | ||||
| 	a.logger.Trace("starting a segment", "timestamp", timestamp) | ||||
| 	a.resetCurrentLog() | ||||
| 	a.currentSegment.startTimestamp = timestamp | ||||
| 	a.currentGlobalSegment.startTimestamp = timestamp | ||||
| } | ||||
|  | ||||
| // Reset all the current segment state. | ||||
| // Should be called with fragmentLock and l held. | ||||
| // Should be called with fragmentLock, globalFragmentLock and l held. | ||||
| func (a *ActivityLog) resetCurrentLog() { | ||||
| 	a.currentSegment.startTimestamp = 0 | ||||
| 	a.currentSegment.currentClients = &activity.EntityActivityLog{ | ||||
| @@ -1015,19 +1119,25 @@ func (a *ActivityLog) resetCurrentLog() { | ||||
| 	} | ||||
|  | ||||
| 	a.currentSegment.clientSequenceNumber = 0 | ||||
|  | ||||
| 	a.fragment = nil | ||||
| 	a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) | ||||
| 	a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) | ||||
|  | ||||
| 	a.currentGlobalSegment.startTimestamp = 0 | ||||
| 	a.currentGlobalSegment.currentClients = &activity.EntityActivityLog{ | ||||
| 		Clients: make([]*activity.EntityRecord, 0), | ||||
| 	} | ||||
| 	a.currentGlobalSegment.clientSequenceNumber = 0 | ||||
| 	a.currentGlobalFragment = nil | ||||
| 	a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) | ||||
|  | ||||
| 	a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) | ||||
| 	a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0) | ||||
| 	a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0) | ||||
| } | ||||
|  | ||||
| func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) { | ||||
| 	entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp) | ||||
| 	tokenPath := fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp) | ||||
| 	globalEntityPath := fmt.Sprintf("%s%v%v/", activityGlobalPathPrefix, activityEntityBasePath, startTimestamp) | ||||
|  | ||||
| 	entitySegments, err := a.view.List(ctx, entityPath) | ||||
| 	if err != nil { | ||||
| @@ -1053,6 +1163,18 @@ func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	globalEntitySegments, err := a.view.List(ctx, globalEntityPath) | ||||
| 	if err != nil { | ||||
| 		a.logger.Error("could not list global entity paths", "error", err) | ||||
| 		return | ||||
| 	} | ||||
| 	for _, p := range globalEntitySegments { | ||||
| 		err = a.view.Delete(ctx, globalEntityPath+p) | ||||
| 		if err != nil { | ||||
| 			a.logger.Error("could not delete global entity log", "error", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Allow whoever started this as a goroutine to wait for it to finish. | ||||
| 	close(whenDone) | ||||
| } | ||||
| @@ -1148,7 +1270,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro | ||||
| 	} | ||||
|  | ||||
| 	// load entity logs from storage into memory | ||||
| 	lastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent) | ||||
| 	lastSegment, globalLastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -1157,7 +1279,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment) | ||||
| 	err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment, globalLastSegment) | ||||
| 	if err != nil || lastSegment == 0 { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -1220,7 +1342,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { | ||||
| 		a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled) | ||||
| 	} | ||||
|  | ||||
| 	if !a.enabled && a.currentSegment.startTimestamp != 0 { | ||||
| 	if !a.enabled && a.currentSegment.startTimestamp != 0 && a.currentGlobalSegment.startTimestamp != 0 { | ||||
| 		a.logger.Trace("deleting current segment") | ||||
| 		a.deleteDone = make(chan struct{}) | ||||
| 		// this is called from a request under stateLock, so use activeContext | ||||
| @@ -1229,7 +1351,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { | ||||
| 	} | ||||
|  | ||||
| 	forceSave := false | ||||
| 	if a.enabled && a.currentSegment.startTimestamp == 0 { | ||||
| 	if a.enabled && a.currentSegment.startTimestamp == 0 && a.currentGlobalSegment.startTimestamp == 0 { | ||||
| 		a.startNewCurrentLogLocked(a.clock.Now().UTC()) | ||||
| 		// Force a save so we can distinguish between | ||||
| 		// | ||||
| @@ -1246,7 +1368,8 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) { | ||||
|  | ||||
| 	if forceSave { | ||||
| 		// l is still held here | ||||
| 		a.saveCurrentSegmentInternal(ctx, true) | ||||
| 		a.saveCurrentSegmentInternal(ctx, true, a.currentSegment, "") | ||||
| 		a.saveCurrentSegmentInternal(ctx, true, a.currentGlobalSegment, activityGlobalPathPrefix) | ||||
| 	} | ||||
|  | ||||
| 	a.defaultReportMonths = config.DefaultReportMonths | ||||
| @@ -1682,7 +1805,13 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) { | ||||
| // activeFragmentWorker handles scheduling the write of the next | ||||
| // segment.  It runs on active nodes only. | ||||
| func (a *ActivityLog) activeFragmentWorker(ctx context.Context) { | ||||
| 	ticker := a.clock.NewTicker(activitySegmentInterval) | ||||
| 	writeInterval := activitySegmentInterval | ||||
| 	// This changes the interval to a duration that was set for testing purposes | ||||
| 	if a.configOverrides.StorageWriteTestingInterval.Microseconds() > 0 { | ||||
| 		writeInterval = a.configOverrides.StorageWriteTestingInterval | ||||
| 	} | ||||
|  | ||||
| 	ticker := a.clock.NewTicker(writeInterval) | ||||
|  | ||||
| 	endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now())) | ||||
| 	if a.configOverrides.DisableTimers { | ||||
|   | ||||
| @@ -9,6 +9,7 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"reflect" | ||||
| 	"sort" | ||||
| @@ -578,6 +579,7 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) { | ||||
| 		now.Add(2 * time.Second).Unix(), | ||||
| 	} | ||||
| 	path := fmt.Sprintf("%sentity/%d/0", ActivityLogPrefix, a.GetStartTimestamp()) | ||||
| 	globalPath := fmt.Sprintf("%sentity/%d/0", ActivityGlobalLogPrefix, a.GetStartTimestamp()) | ||||
|  | ||||
| 	a.AddEntityToFragment(ids[0], "root", times[0]) | ||||
| 	a.AddEntityToFragment(ids[1], "root2", times[1]) | ||||
| @@ -614,6 +616,14 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) { | ||||
| 		t.Fatalf("could not unmarshal protobuf: %v", err) | ||||
| 	} | ||||
| 	expectedEntityIDs(t, out, ids) | ||||
|  | ||||
| 	protoSegment = readSegmentFromStorage(t, core, globalPath) | ||||
| 	out = &activity.EntityActivityLog{} | ||||
| 	err = proto.Unmarshal(protoSegment.Value, out) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("could not unmarshal protobuf: %v", err) | ||||
| 	} | ||||
| 	expectedEntityIDs(t, out, ids) | ||||
| } | ||||
|  | ||||
| // TestActivityLog_StoreAndReadHyperloglog inserts into a hyperloglog, stores it and then reads it back. The test | ||||
| @@ -1211,45 +1221,55 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) { | ||||
| 	core, _, _ := TestCoreUnsealed(t) | ||||
| 	a := core.activityLog | ||||
| 	paths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/0", "entity/1111/1"} | ||||
| 	globalPaths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/1"} | ||||
| 	for _, path := range paths { | ||||
| 		WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test")) | ||||
| 	} | ||||
| 	for _, path := range globalPaths { | ||||
| 		WriteToStorage(t, core, ActivityGlobalLogPrefix+path, []byte("test")) | ||||
| 	} | ||||
|  | ||||
| 	testCases := []struct { | ||||
| 		input        int64 | ||||
| 		expectedVal  uint64 | ||||
| 		expectExists bool | ||||
| 		input             int64 | ||||
| 		expectedVal       uint64 | ||||
| 		expectedGlobalVal uint64 | ||||
| 		expectExists      bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			input:        992, | ||||
| 			expectedVal:  0, | ||||
| 			expectExists: true, | ||||
| 			input:             992, | ||||
| 			expectedVal:       0, | ||||
| 			expectedGlobalVal: 0, | ||||
| 			expectExists:      true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			input:        1000, | ||||
| 			expectedVal:  0, | ||||
| 			expectExists: false, | ||||
| 			input:             1000, | ||||
| 			expectedVal:       0, | ||||
| 			expectedGlobalVal: 0, | ||||
| 			expectExists:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			input:        1001, | ||||
| 			expectedVal:  0, | ||||
| 			expectExists: false, | ||||
| 			input:             1001, | ||||
| 			expectedVal:       0, | ||||
| 			expectedGlobalVal: 0, | ||||
| 			expectExists:      false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			input:        1111, | ||||
| 			expectedVal:  1, | ||||
| 			expectExists: true, | ||||
| 			input:             1111, | ||||
| 			expectedVal:       1, | ||||
| 			expectedGlobalVal: 1, | ||||
| 			expectExists:      true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			input:        2222, | ||||
| 			expectedVal:  0, | ||||
| 			expectExists: false, | ||||
| 			input:             2222, | ||||
| 			expectedVal:       0, | ||||
| 			expectedGlobalVal: 0, | ||||
| 			expectExists:      false, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	ctx := context.Background() | ||||
| 	for _, tc := range testCases { | ||||
| 		result, exists, err := a.getLastEntitySegmentNumber(ctx, time.Unix(tc.input, 0)) | ||||
| 		result, globalSegmentNumber, exists, err := a.getLastEntitySegmentNumber(ctx, time.Unix(tc.input, 0)) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected error for input %d: %v", tc.input, err) | ||||
| 		} | ||||
| @@ -1259,6 +1279,10 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) { | ||||
| 		if result != tc.expectedVal { | ||||
| 			t.Errorf("expected: %d got: %d for input: %d", tc.expectedVal, result, tc.input) | ||||
| 		} | ||||
| 		if globalSegmentNumber != tc.expectedGlobalVal { | ||||
| 			t.Errorf("expected: %d got: %d for input: %d", tc.expectedGlobalVal, globalSegmentNumber, tc.input) | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -1471,16 +1495,19 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { | ||||
| 			t.Fatalf(err.Error()) | ||||
| 		} | ||||
| 		WriteToStorage(t, core, ActivityLogPrefix+tc.path, data) | ||||
| 		WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data) | ||||
| 	} | ||||
|  | ||||
| 	ctx := context.Background() | ||||
| 	for _, tc := range testCases { | ||||
| 		a.l.Lock() | ||||
| 		a.fragmentLock.Lock() | ||||
| 		a.globalFragmentLock.Lock() | ||||
| 		// loadCurrentClientSegment requires us to grab the fragment lock and the | ||||
| 		// activityLog lock, as per the comment in the loadCurrentClientSegment | ||||
| 		// function | ||||
| 		err := a.loadCurrentClientSegment(ctx, time.Unix(tc.time, 0), tc.seqNum) | ||||
| 		err := a.loadCurrentClientSegment(ctx, time.Unix(tc.time, 0), tc.seqNum, tc.seqNum) | ||||
| 		a.globalFragmentLock.Unlock() | ||||
| 		a.fragmentLock.Unlock() | ||||
| 		a.l.Unlock() | ||||
|  | ||||
| @@ -1507,6 +1534,11 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) { | ||||
| 			t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentEntities, tc.path) | ||||
| 		} | ||||
|  | ||||
| 		currentGlobalEntities := a.GetCurrentGlobalEntities() | ||||
| 		if !entityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) { | ||||
| 			t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentGlobalEntities, tc.path) | ||||
| 		} | ||||
|  | ||||
| 		activeClients := core.GetActiveClientsList() | ||||
| 		if err := ActiveEntitiesEqual(activeClients, tc.entities.Clients); err != nil { | ||||
| 			t.Errorf("bad data loaded into active entities. expected only set of EntityID from %v in %v for path %q: %v", tc.entities.Clients, activeClients, tc.path, err) | ||||
| @@ -1584,6 +1616,7 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { | ||||
| 			t.Fatalf(err.Error()) | ||||
| 		} | ||||
| 		WriteToStorage(t, core, ActivityLogPrefix+tc.path, data) | ||||
| 		WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data) | ||||
| 	} | ||||
|  | ||||
| 	ctx := context.Background() | ||||
| @@ -1594,7 +1627,9 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { | ||||
| 			a.localFragmentLock.Lock() | ||||
| 			a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) | ||||
| 			a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord) | ||||
| 			a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) | ||||
| 			a.currentSegment.startTimestamp = tc.time | ||||
| 			a.currentGlobalSegment.startTimestamp = tc.time | ||||
| 			a.fragmentLock.Unlock() | ||||
| 			a.localFragmentLock.Unlock() | ||||
| 			a.l.Unlock() | ||||
| @@ -1768,8 +1803,10 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities | ||||
| 			} | ||||
| 			if i == 0 { | ||||
| 				WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData) | ||||
| 				WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData) | ||||
| 			} else { | ||||
| 				WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) | ||||
| 				WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| @@ -1819,7 +1856,7 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) { | ||||
| 		Clients: expectedClientRecords[len(expectedClientRecords)-1:], | ||||
| 	} | ||||
|  | ||||
| 	currentEntities := a.GetCurrentEntities() | ||||
| 	currentEntities := a.GetCurrentGlobalEntities() | ||||
| 	if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) { | ||||
| 		// we only expect the newest entity segment to be loaded (for the current month) | ||||
| 		t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities) | ||||
| @@ -2312,6 +2349,13 @@ func TestActivityLog_EndOfMonth(t *testing.T) { | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, segment0) | ||||
| 	protoSegment = readSegmentFromStorage(t, core, path) | ||||
| 	out = &activity.EntityActivityLog{} | ||||
| 	err = proto.Unmarshal(protoSegment.Value, out) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	segment1 := a.GetStartTimestamp() | ||||
| 	expectedTimestamp := timeutil.StartOfMonth(month1).Unix() | ||||
| @@ -2374,6 +2418,16 @@ func TestActivityLog_EndOfMonth(t *testing.T) { | ||||
| 			t.Fatalf("could not unmarshal protobuf: %v", err) | ||||
| 		} | ||||
| 		expectedEntityIDs(t, out, tc.ExpectedEntityIDs) | ||||
|  | ||||
| 		// Check for global entities at global storage path | ||||
| 		path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, tc.SegmentTimestamp) | ||||
| 		protoSegment = readSegmentFromStorage(t, core, path) | ||||
| 		out = &activity.EntityActivityLog{} | ||||
| 		err = proto.Unmarshal(protoSegment.Value, out) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("could not unmarshal protobuf: %v", err) | ||||
| 		} | ||||
| 		expectedEntityIDs(t, out, tc.ExpectedEntityIDs) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -5366,3 +5420,212 @@ func TestActivityLog_Export_CSV_Header(t *testing.T) { | ||||
|  | ||||
| 	require.Empty(t, deep.Equal(expectedColumnIndex, encoder.columnIndex)) | ||||
| } | ||||
|  | ||||
| // TestCreateSegment_StoreSegment verifies that | ||||
| // the activity log will correctly create segments from | ||||
| // the fragments and store the right number of clients at | ||||
| // the proper path. This test should be modified to include local clients. | ||||
| func TestCreateSegment_StoreSegment(t *testing.T) { | ||||
| 	cluster := NewTestCluster(t, nil, nil) | ||||
| 	core := cluster.Cores[0].Core | ||||
| 	a := core.activityLog | ||||
| 	a.SetEnable(true) | ||||
|  | ||||
| 	ctx := context.Background() | ||||
| 	timeStamp := time.Now() | ||||
|  | ||||
| 	clientRecords := make([]*activity.EntityRecord, ActivitySegmentClientCapacity*2+1) | ||||
| 	for i := range clientRecords { | ||||
| 		clientRecords[i] = &activity.EntityRecord{ | ||||
| 			ClientID:  fmt.Sprintf("111122222-3333-4444-5555-%012v", i), | ||||
| 			Timestamp: timeStamp.Unix(), | ||||
| 			NonEntity: false, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	startTime := a.GetStartTimestamp() | ||||
| 	parsedTime := time.Unix(startTime, 0) | ||||
|  | ||||
| 	testCases := []struct { | ||||
| 		testName              string | ||||
| 		numClients            int | ||||
| 		pathPrefix            string | ||||
| 		maxClientsPerFragment int | ||||
| 		global                bool | ||||
| 		forceStore            bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			testName:              "[global] max client size, drop clients", | ||||
| 			numClients:            ActivitySegmentClientCapacity*2 + 1, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global, no-force] max client size, drop clients", | ||||
| 			numClients:            ActivitySegmentClientCapacity*2 + 1, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                true, | ||||
| 			forceStore:            true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global] max segment size", | ||||
| 			numClients:            ActivitySegmentClientCapacity, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global, no-force] max segment size", | ||||
| 			numClients:            ActivitySegmentClientCapacity, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                true, | ||||
| 			forceStore:            true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global] max segment size, multiple fragments", | ||||
| 			numClients:            ActivitySegmentClientCapacity, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity - 1, | ||||
| 			global:                true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global, no-force] max segment size, multiple fragments", | ||||
| 			numClients:            ActivitySegmentClientCapacity, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity - 1, | ||||
| 			global:                true, | ||||
| 			forceStore:            true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global] roll over", | ||||
| 			numClients:            ActivitySegmentClientCapacity + 2, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global, no-force] roll over", | ||||
| 			numClients:            ActivitySegmentClientCapacity + 2, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                true, | ||||
| 			forceStore:            true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global] max segment size, rollover multiple fragments", | ||||
| 			numClients:            ActivitySegmentClientCapacity * 2, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity - 1, | ||||
| 			global:                true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[global, no-force] max segment size, rollover multiple fragments", | ||||
| 			numClients:            ActivitySegmentClientCapacity * 2, | ||||
| 			pathPrefix:            activityGlobalPathPrefix, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity - 1, | ||||
| 			global:                true, | ||||
| 			forceStore:            true, | ||||
| 		}, | ||||
|  | ||||
| 		{ | ||||
| 			testName:              "[non-global] max segment size", | ||||
| 			numClients:            ActivitySegmentClientCapacity, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[non-global] max segment size, multiple fragments", | ||||
| 			numClients:            ActivitySegmentClientCapacity, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity - 1, | ||||
| 			global:                false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[non-global] roll over", | ||||
| 			numClients:            ActivitySegmentClientCapacity + 2, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[non-global] max segment size, rollover multiple fragments", | ||||
| 			numClients:            ActivitySegmentClientCapacity * 2, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity - 1, | ||||
| 			global:                false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			testName:              "[non-global] max client size, drop clients", | ||||
| 			numClients:            ActivitySegmentClientCapacity*2 + 1, | ||||
| 			maxClientsPerFragment: ActivitySegmentClientCapacity, | ||||
| 			global:                false, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, test := range testCases { | ||||
| 		t.Run(test.testName, func(t *testing.T) { | ||||
| 			// Add clients to fragments | ||||
| 			fragments := make([]*activity.LogFragment, 0) | ||||
| 			remainder := test.numClients | ||||
| 			var i int | ||||
| 			for i = 0; i+test.maxClientsPerFragment < test.numClients; i = i + test.maxClientsPerFragment { | ||||
| 				clients := clientRecords[i : i+test.maxClientsPerFragment] | ||||
| 				remainder -= test.maxClientsPerFragment | ||||
| 				fragments = append(fragments, &activity.LogFragment{Clients: clients}) | ||||
| 			} | ||||
| 			if remainder > 0 { | ||||
| 				clients := clientRecords[i : i+remainder] | ||||
| 				fragments = append(fragments, &activity.LogFragment{Clients: clients}) | ||||
|  | ||||
| 			} | ||||
|  | ||||
| 			segment := &a.currentGlobalSegment | ||||
| 			if !test.global { | ||||
| 				segment = &a.currentSegment | ||||
| 			} | ||||
|  | ||||
| 			// Create segments and write to storage | ||||
| 			require.NoError(t, core.StoreCurrentSegment(ctx, fragments, segment, test.forceStore, test.pathPrefix)) | ||||
|  | ||||
| 			reader, err := a.NewSegmentFileReader(ctx, parsedTime) | ||||
| 			require.NoError(t, err) | ||||
| 			var clientTotal int | ||||
| 			if test.global { | ||||
| 				for { | ||||
| 					entity, err := reader.ReadGlobalEntity(ctx) | ||||
| 					if errors.Is(err, io.EOF) { | ||||
| 						break | ||||
| 					} | ||||
| 					require.NoError(t, err) | ||||
| 					clientTotal += len(entity.GetClients()) | ||||
| 				} | ||||
| 			} else { | ||||
| 				for { | ||||
| 					entity, err := reader.ReadEntity(ctx) | ||||
| 					if errors.Is(err, io.EOF) { | ||||
| 						break | ||||
| 					} | ||||
| 					require.NoError(t, err) | ||||
| 					clientTotal += len(entity.GetClients()) | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			// The current behavior is that there were greater than 2 * ActivitySegmentClientCapacity seen, then we | ||||
| 			// drop of the remainder of those clients seen during that time. Let's verify that this is the case | ||||
| 			expectedTotal := test.numClients | ||||
| 			if test.numClients > 2*ActivitySegmentClientCapacity { | ||||
| 				expectedTotal = 2 * ActivitySegmentClientCapacity | ||||
| 			} | ||||
|  | ||||
| 			require.Equal(t, expectedTotal, clientTotal) | ||||
|  | ||||
| 			// Delete any logs written in this test | ||||
| 			core.DeleteLogsAtPath(ctx, t, test.pathPrefix+activityEntityBasePath, startTime) | ||||
| 			// Reset client sequence number and current client slice back to original values | ||||
| 			segment.clientSequenceNumber = 0 | ||||
| 			segment.currentClients = &activity.EntityActivityLog{ | ||||
| 				Clients: make([]*activity.EntityRecord, 0), | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -100,6 +100,15 @@ func (a *ActivityLog) GetCurrentEntities() *activity.EntityActivityLog { | ||||
| 	return a.currentSegment.currentClients | ||||
| } | ||||
|  | ||||
| // GetCurrentGlobalEntities returns the current global entity activity log | ||||
| func (a *ActivityLog) GetCurrentGlobalEntities() *activity.EntityActivityLog { | ||||
| 	a.l.RLock() | ||||
| 	defer a.l.RUnlock() | ||||
| 	a.globalFragmentLock.RLock() | ||||
| 	defer a.globalFragmentLock.RUnlock() | ||||
| 	return a.currentGlobalSegment.currentClients | ||||
| } | ||||
|  | ||||
| // WriteToStorage is used to put entity data in storage | ||||
| // `path` should be the complete path (not relative to the view) | ||||
| func WriteToStorage(t *testing.T, c *Core, path string, data []byte) { | ||||
| @@ -218,7 +227,12 @@ func ActiveEntitiesEqual(active []*activity.EntityRecord, test []*activity.Entit | ||||
| func (a *ActivityLog) GetStartTimestamp() int64 { | ||||
| 	a.l.RLock() | ||||
| 	defer a.l.RUnlock() | ||||
| 	return a.currentSegment.startTimestamp | ||||
| 	// TODO: We will substitute a.currentSegment with a.currentLocalSegment when we remove | ||||
| 	// a.currentSegment from the code | ||||
| 	if a.currentGlobalSegment.startTimestamp != a.currentSegment.startTimestamp { | ||||
| 		return -1 | ||||
| 	} | ||||
| 	return a.currentGlobalSegment.startTimestamp | ||||
| } | ||||
|  | ||||
| // SetStartTimestamp sets the start timestamp on an activity log | ||||
| @@ -226,6 +240,7 @@ func (a *ActivityLog) SetStartTimestamp(timestamp int64) { | ||||
| 	a.l.Lock() | ||||
| 	defer a.l.Unlock() | ||||
| 	a.currentSegment.startTimestamp = timestamp | ||||
| 	a.currentGlobalSegment.startTimestamp = timestamp | ||||
| } | ||||
|  | ||||
| // GetStoredTokenCountByNamespaceID returns the count of tokens by namespace ID | ||||
| @@ -287,3 +302,27 @@ func (c *Core) GetActiveFragment() *activity.LogFragment { | ||||
| 	defer c.activityLog.fragmentLock.RUnlock() | ||||
| 	return c.activityLog.fragment | ||||
| } | ||||
|  | ||||
| // StoreCurrentSegment is a test only method to create and store | ||||
| // segments from fragments. This allows createCurrentSegmentFromFragments to remain | ||||
| // private | ||||
| func (c *Core) StoreCurrentSegment(ctx context.Context, fragments []*activity.LogFragment, currentSegment *segmentInfo, force bool, storagePathPrefix string) error { | ||||
| 	return c.activityLog.createCurrentSegmentFromFragments(ctx, fragments, currentSegment, force, storagePathPrefix) | ||||
| } | ||||
|  | ||||
| // DeleteLogsAtPath is test helper function deletes all logs at the given path | ||||
| func (c *Core) DeleteLogsAtPath(ctx context.Context, t *testing.T, storagePath string, startTime int64) { | ||||
| 	basePath := storagePath + fmt.Sprint(startTime) + "/" | ||||
| 	a := c.activityLog | ||||
| 	segments, err := a.view.List(ctx, basePath) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("could not list path %v", err) | ||||
| 		return | ||||
| 	} | ||||
| 	for _, p := range segments { | ||||
| 		err = a.view.Delete(ctx, basePath+p) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("could not delete path %v", err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -424,14 +424,16 @@ type singleTypeSegmentReader struct { | ||||
| 	a                *ActivityLog | ||||
| } | ||||
| type segmentReader struct { | ||||
| 	tokens   *singleTypeSegmentReader | ||||
| 	entities *singleTypeSegmentReader | ||||
| 	tokens         *singleTypeSegmentReader | ||||
| 	entities       *singleTypeSegmentReader | ||||
| 	globalEntities *singleTypeSegmentReader | ||||
| } | ||||
|  | ||||
| // SegmentReader is an interface that provides methods to read tokens and entities in order | ||||
| type SegmentReader interface { | ||||
| 	ReadToken(ctx context.Context) (*activity.TokenCount, error) | ||||
| 	ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error) | ||||
| 	ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error) | ||||
| } | ||||
|  | ||||
| func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.Time) (SegmentReader, error) { | ||||
| @@ -439,11 +441,15 @@ func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.T | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	globalEntities, err := a.newSingleTypeSegmentReader(ctx, startTime, activityGlobalPathPrefix+activityEntityBasePath) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenLocalBasePath) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &segmentReader{entities: entities, tokens: tokens}, nil | ||||
| 	return &segmentReader{entities: entities, globalEntities: globalEntities, tokens: tokens}, nil | ||||
| } | ||||
|  | ||||
| func (a *ActivityLog) newSingleTypeSegmentReader(ctx context.Context, startTime time.Time, prefix string) (*singleTypeSegmentReader, error) { | ||||
| @@ -509,6 +515,17 @@ func (e *segmentReader) ReadEntity(ctx context.Context) (*activity.EntityActivit | ||||
| 	return out, nil | ||||
| } | ||||
|  | ||||
| // ReadGlobalEntity reads a global entity from the global segment | ||||
| // If there is none available, then the error will be io.EOF | ||||
| func (e *segmentReader) ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error) { | ||||
| 	out := &activity.EntityActivityLog{} | ||||
| 	err := e.globalEntities.nextValue(ctx, out) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return out, nil | ||||
| } | ||||
|  | ||||
| // namespaceRecordToCountsResponse converts the record to the ResponseCounts | ||||
| // type. The function sums entity, non-entity, and secret sync counts to get the | ||||
| // total client count. | ||||
|   | ||||
| @@ -990,6 +990,14 @@ func Test_ActivityLog_ComputeCurrentMonth_NamespaceMounts(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // writeGlobalEntitySegment writes a single global segment file with the given time and index for an entity | ||||
| func writeGlobalEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) { | ||||
| 	t.Helper() | ||||
| 	protoItem, err := proto.Marshal(item) | ||||
| 	require.NoError(t, err) | ||||
| 	WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, ts, index), protoItem) | ||||
| } | ||||
|  | ||||
| // writeEntitySegment writes a single segment file with the given time and index for an entity | ||||
| func writeEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) { | ||||
| 	t.Helper() | ||||
| @@ -1022,6 +1030,7 @@ func TestSegmentFileReader_BadData(t *testing.T) { | ||||
| 	// write bad data that won't be able to be unmarshaled at index 0 | ||||
| 	WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, 0), []byte("fake data")) | ||||
| 	WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data")) | ||||
| 	WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, 0), []byte("fake data")) | ||||
|  | ||||
| 	// write entity at index 1 | ||||
| 	entity := &activity.EntityActivityLog{Clients: []*activity.EntityRecord{ | ||||
| @@ -1031,6 +1040,9 @@ func TestSegmentFileReader_BadData(t *testing.T) { | ||||
| 	}} | ||||
| 	writeEntitySegment(t, core, now, 1, entity) | ||||
|  | ||||
| 	// write global data at index 1 | ||||
| 	writeGlobalEntitySegment(t, core, now, 1, entity) | ||||
|  | ||||
| 	// write token at index 1 | ||||
| 	token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{ | ||||
| 		"ns": 1, | ||||
| @@ -1047,6 +1059,14 @@ func TestSegmentFileReader_BadData(t *testing.T) { | ||||
| 	require.True(t, proto.Equal(gotEntity, entity)) | ||||
| 	require.Nil(t, err) | ||||
|  | ||||
| 	// first the bad global entity is read, which returns an error | ||||
| 	_, err = reader.ReadGlobalEntity(context.Background()) | ||||
| 	require.Error(t, err) | ||||
| 	// then, the reader can read the good entity at index 1 | ||||
| 	gotEntity, err = reader.ReadGlobalEntity(context.Background()) | ||||
| 	require.True(t, proto.Equal(gotEntity, entity)) | ||||
| 	require.Nil(t, err) | ||||
|  | ||||
| 	// the bad token causes an error | ||||
| 	_, err = reader.ReadToken(context.Background()) | ||||
| 	require.Error(t, err) | ||||
| @@ -1065,6 +1085,7 @@ func TestSegmentFileReader_MissingData(t *testing.T) { | ||||
| 	for i := 0; i < 3; i++ { | ||||
| 		WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, i), []byte("fake data")) | ||||
| 		WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, i), []byte("fake data")) | ||||
| 		WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, i), []byte("fake data")) | ||||
|  | ||||
| 	} | ||||
| 	// write entity at index 3 | ||||
| @@ -1074,6 +1095,8 @@ func TestSegmentFileReader_MissingData(t *testing.T) { | ||||
| 		}, | ||||
| 	}} | ||||
| 	writeEntitySegment(t, core, now, 3, entity) | ||||
| 	// write global entity at index 3 | ||||
| 	writeGlobalEntitySegment(t, core, now, 3, entity) | ||||
| 	// write token at index 3 | ||||
| 	token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{ | ||||
| 		"ns": 1, | ||||
| @@ -1086,6 +1109,7 @@ func TestSegmentFileReader_MissingData(t *testing.T) { | ||||
| 	for i := 0; i < 3; i++ { | ||||
| 		require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenLocalBasePath, now, i))) | ||||
| 		require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i))) | ||||
| 		require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, i))) | ||||
| 	} | ||||
|  | ||||
| 	// we expect the reader to only return the data at index 3, and then be done | ||||
| @@ -1100,6 +1124,12 @@ func TestSegmentFileReader_MissingData(t *testing.T) { | ||||
| 	require.True(t, proto.Equal(gotToken, token)) | ||||
| 	_, err = reader.ReadToken(context.Background()) | ||||
| 	require.Equal(t, err, io.EOF) | ||||
|  | ||||
| 	gotEntity, err = reader.ReadGlobalEntity(context.Background()) | ||||
| 	require.NoError(t, err) | ||||
| 	require.True(t, proto.Equal(gotEntity, entity)) | ||||
| 	_, err = reader.ReadGlobalEntity(context.Background()) | ||||
| 	require.Equal(t, err, io.EOF) | ||||
| } | ||||
|  | ||||
| // TestSegmentFileReader_NoData verifies that the reader return io.EOF when there is no data | ||||
|   | ||||
| @@ -54,7 +54,7 @@ func TestACMERegeneration_RegenerateWithCurrentMonth(t *testing.T) { | ||||
| 	}) | ||||
| 	require.NoError(t, err) | ||||
| 	now := time.Now().UTC() | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(3). | ||||
| 		// 3 months ago, 15 non-entity clients and 10 ACME clients | ||||
| 		NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). | ||||
| @@ -116,7 +116,7 @@ func TestACMERegeneration_RegenerateMuchOlder(t *testing.T) { | ||||
| 	client := cluster.Cores[0].Client | ||||
|  | ||||
| 	now := time.Now().UTC() | ||||
| 	_, err := clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err := clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(5). | ||||
| 		// 5 months ago, 15 non-entity clients and 10 ACME clients | ||||
| 		NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). | ||||
| @@ -159,7 +159,7 @@ func TestACMERegeneration_RegeneratePreviousMonths(t *testing.T) { | ||||
| 	client := cluster.Cores[0].Client | ||||
|  | ||||
| 	now := time.Now().UTC() | ||||
| 	_, err := clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err := clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(3). | ||||
| 		// 3 months ago, 15 non-entity clients and 10 ACME clients | ||||
| 		NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")). | ||||
|   | ||||
| @@ -29,7 +29,7 @@ func Test_ActivityLog_Disable(t *testing.T) { | ||||
| 		"enabled": "enable", | ||||
| 	}) | ||||
| 	require.NoError(t, err) | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(1). | ||||
| 		NewClientsSeen(5). | ||||
| 		NewCurrentMonthData(). | ||||
|   | ||||
| @@ -86,7 +86,7 @@ func Test_ActivityLog_LoseLeadership(t *testing.T) { | ||||
| 	}) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewCurrentMonthData(). | ||||
| 		NewClientsSeen(10). | ||||
| 		Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) | ||||
| @@ -121,7 +121,7 @@ func Test_ActivityLog_ClientsOverlapping(t *testing.T) { | ||||
| 		"enabled": "enable", | ||||
| 	}) | ||||
| 	require.NoError(t, err) | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(1). | ||||
| 		NewClientsSeen(7). | ||||
| 		NewCurrentMonthData(). | ||||
| @@ -169,7 +169,7 @@ func Test_ActivityLog_ClientsNewCurrentMonth(t *testing.T) { | ||||
| 		"enabled": "enable", | ||||
| 	}) | ||||
| 	require.NoError(t, err) | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(1). | ||||
| 		NewClientsSeen(5). | ||||
| 		NewCurrentMonthData(). | ||||
| @@ -203,7 +203,7 @@ func Test_ActivityLog_EmptyDataMonths(t *testing.T) { | ||||
| 		"enabled": "enable", | ||||
| 	}) | ||||
| 	require.NoError(t, err) | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewCurrentMonthData(). | ||||
| 		NewClientsSeen(10). | ||||
| 		Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) | ||||
| @@ -243,7 +243,7 @@ func Test_ActivityLog_FutureEndDate(t *testing.T) { | ||||
| 		"enabled": "enable", | ||||
| 	}) | ||||
| 	require.NoError(t, err) | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(1). | ||||
| 		NewClientsSeen(10). | ||||
| 		NewCurrentMonthData(). | ||||
| @@ -316,7 +316,7 @@ func Test_ActivityLog_ClientTypeResponse(t *testing.T) { | ||||
| 			_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ | ||||
| 				"enabled": "enable", | ||||
| 			}) | ||||
| 			_, err = clientcountutil.NewActivityLogData(client). | ||||
| 			_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 				NewCurrentMonthData(). | ||||
| 				NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). | ||||
| 				Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) | ||||
| @@ -369,7 +369,7 @@ func Test_ActivityLogCurrentMonth_Response(t *testing.T) { | ||||
| 			_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ | ||||
| 				"enabled": "enable", | ||||
| 			}) | ||||
| 			_, err = clientcountutil.NewActivityLogData(client). | ||||
| 			_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 				NewCurrentMonthData(). | ||||
| 				NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). | ||||
| 				Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) | ||||
| @@ -420,7 +420,7 @@ func Test_ActivityLog_Deduplication(t *testing.T) { | ||||
| 			_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{ | ||||
| 				"enabled": "enable", | ||||
| 			}) | ||||
| 			_, err = clientcountutil.NewActivityLogData(client). | ||||
| 			_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 				NewPreviousMonthData(3). | ||||
| 				NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)). | ||||
| 				NewPreviousMonthData(2). | ||||
| @@ -462,7 +462,7 @@ func Test_ActivityLog_MountDeduplication(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
| 	now := time.Now().UTC() | ||||
|  | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewPreviousMonthData(1). | ||||
| 		NewClientSeen(clientcountutil.WithClientMount("sys")). | ||||
| 		NewClientSeen(clientcountutil.WithClientMount("secret")). | ||||
| @@ -669,7 +669,7 @@ func Test_ActivityLog_Export_Sudo(t *testing.T) { | ||||
|  | ||||
| 	rootToken := client.Token() | ||||
|  | ||||
| 	_, err = clientcountutil.NewActivityLogData(client). | ||||
| 	_, _, err = clientcountutil.NewActivityLogData(client). | ||||
| 		NewCurrentMonthData(). | ||||
| 		NewClientsSeen(10). | ||||
| 		Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES) | ||||
| @@ -845,7 +845,7 @@ func TestHandleQuery_MultipleMounts(t *testing.T) { | ||||
| 			} | ||||
|  | ||||
| 			// Write all the client count data | ||||
| 			_, err = activityLogGenerator.Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) | ||||
| 			_, _, err = activityLogGenerator.Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES) | ||||
| 			require.NoError(t, err) | ||||
|  | ||||
| 			endOfCurrentMonth := timeutil.EndOfMonth(time.Now().UTC()) | ||||
|   | ||||
| @@ -85,14 +85,15 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo | ||||
| 	for _, opt := range input.Write { | ||||
| 		opts[opt] = struct{}{} | ||||
| 	} | ||||
| 	paths, err := generated.write(ctx, opts, b.Core.activityLog, now) | ||||
| 	paths, globalPaths, err := generated.write(ctx, opts, b.Core.activityLog, now) | ||||
| 	if err != nil { | ||||
| 		b.logger.Debug("failed to write activity log data", "error", err.Error()) | ||||
| 		return logical.ErrorResponse("failed to write data"), err | ||||
| 	} | ||||
| 	return &logical.Response{ | ||||
| 		Data: map[string]interface{}{ | ||||
| 			"paths": paths, | ||||
| 			"paths":        paths, | ||||
| 			"global_paths": globalPaths, | ||||
| 		}, | ||||
| 	}, nil | ||||
| } | ||||
| @@ -101,9 +102,14 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo | ||||
| type singleMonthActivityClients struct { | ||||
| 	// clients are indexed by ID | ||||
| 	clients []*activity.EntityRecord | ||||
| 	// globalClients are indexed by ID | ||||
| 	globalClients []*activity.EntityRecord | ||||
| 	// predefinedSegments map from the segment number to the client's index in | ||||
| 	// the clients slice | ||||
| 	predefinedSegments map[int][]int | ||||
| 	// predefinedGlobalSegments map from the segment number to the client's index in | ||||
| 	// the clients slice | ||||
| 	predefinedGlobalSegments map[int][]int | ||||
| 	// generationParameters holds the generation request | ||||
| 	generationParameters *generation.Data | ||||
| } | ||||
| @@ -114,11 +120,19 @@ type multipleMonthsActivityClients struct { | ||||
| 	months []*singleMonthActivityClients | ||||
| } | ||||
|  | ||||
| func (s *singleMonthActivityClients) addEntityRecord(record *activity.EntityRecord, segmentIndex *int) { | ||||
| func (s *singleMonthActivityClients) addEntityRecord(core *Core, record *activity.EntityRecord, segmentIndex *int) { | ||||
| 	s.clients = append(s.clients, record) | ||||
| 	local, _ := core.activityLog.isClientLocal(record) | ||||
| 	if !local { | ||||
| 		s.globalClients = append(s.globalClients, record) | ||||
| 	} | ||||
| 	if segmentIndex != nil { | ||||
| 		index := len(s.clients) - 1 | ||||
| 		s.predefinedSegments[*segmentIndex] = append(s.predefinedSegments[*segmentIndex], index) | ||||
| 		if !local { | ||||
| 			globalIndex := len(s.globalClients) - 1 | ||||
| 			s.predefinedGlobalSegments[*segmentIndex] = append(s.predefinedGlobalSegments[*segmentIndex], globalIndex) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -126,7 +140,7 @@ func (s *singleMonthActivityClients) addEntityRecord(record *activity.EntityReco | ||||
| // keys are the segment index, and the value are the clients that were seen in | ||||
| // that index. If the value is an empty slice, then it's an empty index. If the | ||||
| // value is nil, then it's a skipped index | ||||
| func (s *singleMonthActivityClients) populateSegments() (map[int][]*activity.EntityRecord, error) { | ||||
| func (s *singleMonthActivityClients) populateSegments(predefinedSegments map[int][]int, clients []*activity.EntityRecord) (map[int][]*activity.EntityRecord, error) { | ||||
| 	segments := make(map[int][]*activity.EntityRecord) | ||||
| 	ignoreIndexes := make(map[int]struct{}) | ||||
| 	skipIndexes := s.generationParameters.SkipSegmentIndexes | ||||
| @@ -142,11 +156,11 @@ func (s *singleMonthActivityClients) populateSegments() (map[int][]*activity.Ent | ||||
| 	} | ||||
|  | ||||
| 	// if we have predefined segments, then we can construct the map using those | ||||
| 	if len(s.predefinedSegments) > 0 { | ||||
| 		for segment, clientIndexes := range s.predefinedSegments { | ||||
| 	if len(predefinedSegments) > 0 { | ||||
| 		for segment, clientIndexes := range predefinedSegments { | ||||
| 			clientsInSegment := make([]*activity.EntityRecord, 0, len(clientIndexes)) | ||||
| 			for _, idx := range clientIndexes { | ||||
| 				clientsInSegment = append(clientsInSegment, s.clients[idx]) | ||||
| 				clientsInSegment = append(clientsInSegment, clients[idx]) | ||||
| 			} | ||||
| 			segments[segment] = clientsInSegment | ||||
| 		} | ||||
| @@ -155,8 +169,8 @@ func (s *singleMonthActivityClients) populateSegments() (map[int][]*activity.Ent | ||||
|  | ||||
| 	// determine how many segments are necessary to store the clients for this month | ||||
| 	// using the default storage limits | ||||
| 	numNecessarySegments := len(s.clients) / ActivitySegmentClientCapacity | ||||
| 	if len(s.clients)%ActivitySegmentClientCapacity != 0 { | ||||
| 	numNecessarySegments := len(clients) / ActivitySegmentClientCapacity | ||||
| 	if len(clients)%ActivitySegmentClientCapacity != 0 { | ||||
| 		numNecessarySegments++ | ||||
| 	} | ||||
| 	totalSegmentCount := numNecessarySegments | ||||
| @@ -173,8 +187,8 @@ func (s *singleMonthActivityClients) populateSegments() (map[int][]*activity.Ent | ||||
| 	} | ||||
|  | ||||
| 	// determine how many clients should be in each segment | ||||
| 	segmentSizes := len(s.clients) / usableSegmentCount | ||||
| 	if len(s.clients)%usableSegmentCount != 0 { | ||||
| 	segmentSizes := len(clients) / usableSegmentCount | ||||
| 	if len(clients)%usableSegmentCount != 0 { | ||||
| 		segmentSizes++ | ||||
| 	} | ||||
|  | ||||
| @@ -184,14 +198,14 @@ func (s *singleMonthActivityClients) populateSegments() (map[int][]*activity.Ent | ||||
|  | ||||
| 	clientIndex := 0 | ||||
| 	for i := 0; i < totalSegmentCount; i++ { | ||||
| 		if clientIndex >= len(s.clients) { | ||||
| 		if clientIndex >= len(clients) { | ||||
| 			break | ||||
| 		} | ||||
| 		if _, ok := ignoreIndexes[i]; ok { | ||||
| 			continue | ||||
| 		} | ||||
| 		for len(segments[i]) < segmentSizes && clientIndex < len(s.clients) { | ||||
| 			segments[i] = append(segments[i], s.clients[clientIndex]) | ||||
| 		for len(segments[i]) < segmentSizes && clientIndex < len(clients) { | ||||
| 			segments[i] = append(segments[i], clients[clientIndex]) | ||||
| 			clientIndex++ | ||||
| 		} | ||||
| 	} | ||||
| @@ -200,7 +214,7 @@ func (s *singleMonthActivityClients) populateSegments() (map[int][]*activity.Ent | ||||
|  | ||||
| // addNewClients generates clients according to the given parameters, and adds them to the month | ||||
| // the client will always have the mountAccessor as its mount accessor | ||||
| func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAccessor string, segmentIndex *int, monthsAgo int32, now time.Time) error { | ||||
| func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAccessor string, segmentIndex *int, monthsAgo int32, now time.Time, core *Core) error { | ||||
| 	count := 1 | ||||
| 	if c.Count > 1 { | ||||
| 		count = int(c.Count) | ||||
| @@ -224,7 +238,8 @@ func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAc | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 		s.addEntityRecord(record, segmentIndex) | ||||
|  | ||||
| 		s.addEntityRecord(core, record, segmentIndex) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| @@ -293,7 +308,7 @@ func (m *multipleMonthsActivityClients) processMonth(ctx context.Context, core * | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			err = m.addClientToMonth(month.GetMonthsAgo(), clients, mountAccessor, segmentIndex, now) | ||||
| 			err = m.addClientToMonth(month.GetMonthsAgo(), clients, mountAccessor, segmentIndex, now, core) | ||||
| 			if err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| @@ -319,14 +334,14 @@ func (m *multipleMonthsActivityClients) processMonth(ctx context.Context, core * | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (m *multipleMonthsActivityClients) addClientToMonth(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int, now time.Time) error { | ||||
| func (m *multipleMonthsActivityClients) addClientToMonth(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int, now time.Time, core *Core) error { | ||||
| 	if c.Repeated || c.RepeatedFromMonth > 0 { | ||||
| 		return m.addRepeatedClients(monthsAgo, c, mountAccessor, segmentIndex) | ||||
| 		return m.addRepeatedClients(monthsAgo, c, mountAccessor, segmentIndex, core) | ||||
| 	} | ||||
| 	return m.months[monthsAgo].addNewClients(c, mountAccessor, segmentIndex, monthsAgo, now) | ||||
| 	return m.months[monthsAgo].addNewClients(c, mountAccessor, segmentIndex, monthsAgo, now, core) | ||||
| } | ||||
|  | ||||
| func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int) error { | ||||
| func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int, core *Core) error { | ||||
| 	addingTo := m.months[monthsAgo] | ||||
| 	repeatedFromMonth := monthsAgo + 1 | ||||
| 	if c.RepeatedFromMonth > 0 { | ||||
| @@ -339,7 +354,7 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g | ||||
| 	} | ||||
| 	for _, client := range repeatedFrom.clients { | ||||
| 		if c.ClientType == client.ClientType && mountAccessor == client.MountAccessor && c.Namespace == client.NamespaceID { | ||||
| 			addingTo.addEntityRecord(client, segmentIndex) | ||||
| 			addingTo.addEntityRecord(core, client, segmentIndex) | ||||
| 			numClients-- | ||||
| 			if numClients == 0 { | ||||
| 				break | ||||
| @@ -369,8 +384,9 @@ func (m *multipleMonthsActivityClients) timestampForMonth(i int, now time.Time) | ||||
| 	return now | ||||
| } | ||||
|  | ||||
| func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog, now time.Time) ([]string, error) { | ||||
| func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog, now time.Time) ([]string, []string, error) { | ||||
| 	paths := []string{} | ||||
| 	globalPaths := []string{} | ||||
|  | ||||
| 	_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES] | ||||
| 	_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS] | ||||
| @@ -383,9 +399,9 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene | ||||
| 			continue | ||||
| 		} | ||||
| 		timestamp := m.timestampForMonth(i, now) | ||||
| 		segments, err := month.populateSegments() | ||||
| 		segments, err := month.populateSegments(month.predefinedSegments, month.clients) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 			return nil, nil, err | ||||
| 		} | ||||
| 		for segmentIndex, segment := range segments { | ||||
| 			if segment == nil { | ||||
| @@ -397,12 +413,34 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene | ||||
| 				currentClients:       &activity.EntityActivityLog{Clients: segment}, | ||||
| 				clientSequenceNumber: uint64(segmentIndex), | ||||
| 				tokenCount:           &activity.TokenCount{}, | ||||
| 			}, true) | ||||
| 			}, true, "") | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 				return nil, nil, err | ||||
| 			} | ||||
| 			paths = append(paths, entityPath) | ||||
| 		} | ||||
| 		if len(month.globalClients) > 0 { | ||||
| 			globalSegments, err := month.populateSegments(month.predefinedGlobalSegments, month.globalClients) | ||||
| 			if err != nil { | ||||
| 				return nil, nil, err | ||||
| 			} | ||||
| 			for segmentIndex, segment := range globalSegments { | ||||
| 				if segment == nil { | ||||
| 					// skip the index | ||||
| 					continue | ||||
| 				} | ||||
| 				entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{ | ||||
| 					startTimestamp:       timestamp.Unix(), | ||||
| 					currentClients:       &activity.EntityActivityLog{Clients: segment}, | ||||
| 					clientSequenceNumber: uint64(segmentIndex), | ||||
| 					tokenCount:           &activity.TokenCount{}, | ||||
| 				}, true, activityGlobalPathPrefix) | ||||
| 				if err != nil { | ||||
| 					return nil, nil, err | ||||
| 				} | ||||
| 				globalPaths = append(globalPaths, entityPath) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	if writePQ || writeDistinctClients { | ||||
| 		// start with the oldest month of data, and create precomputed queries | ||||
| @@ -423,16 +461,16 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene | ||||
| 	if writeIntentLog { | ||||
| 		err := activityLog.writeIntentLog(ctx, m.latestTimestamp(now, false).Unix(), m.latestTimestamp(now, true).UTC()) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 			return nil, nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	err := activityLog.refreshFromStoredLog(ctx, &wg, now) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 	wg.Wait() | ||||
| 	return paths, nil | ||||
| 	return paths, globalPaths, nil | ||||
| } | ||||
|  | ||||
| func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time, includeCurrentMonth bool) time.Time { | ||||
| @@ -460,7 +498,8 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit | ||||
| 	} | ||||
| 	for i := 0; i < numberOfMonths; i++ { | ||||
| 		m.months[i] = &singleMonthActivityClients{ | ||||
| 			predefinedSegments: make(map[int][]int), | ||||
| 			predefinedSegments:       make(map[int][]int), | ||||
| 			predefinedGlobalSegments: make(map[int][]int), | ||||
| 		} | ||||
| 	} | ||||
| 	return m | ||||
| @@ -484,6 +523,17 @@ type sliceSegmentReader struct { | ||||
| 	i       int | ||||
| } | ||||
|  | ||||
| // ReadGlobalEntity here is a dummy implementation. | ||||
| // Segment reader is never used when writing using the ClientCountUtil library | ||||
| func (p *sliceSegmentReader) ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error) { | ||||
| 	if p.i == len(p.records) { | ||||
| 		return nil, io.EOF | ||||
| 	} | ||||
| 	record := p.records[p.i] | ||||
| 	p.i++ | ||||
| 	return &activity.EntityActivityLog{Clients: record}, nil | ||||
| } | ||||
|  | ||||
| func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) { | ||||
| 	return nil, io.EOF | ||||
| } | ||||
|   | ||||
| @@ -167,10 +167,12 @@ func Test_singleMonthActivityClients_addNewClients(t *testing.T) { | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			core, _, _ := TestCoreUnsealed(t) | ||||
| 			m := &singleMonthActivityClients{ | ||||
| 				predefinedSegments: make(map[int][]int), | ||||
| 				predefinedSegments:       make(map[int][]int), | ||||
| 				predefinedGlobalSegments: make(map[int][]int), | ||||
| 			} | ||||
| 			err := m.addNewClients(tt.clients, tt.mount, tt.segmentIndex, 0, time.Now().UTC()) | ||||
| 			err := m.addNewClients(tt.clients, tt.mount, tt.segmentIndex, 0, time.Now().UTC(), core) | ||||
| 			require.NoError(t, err) | ||||
| 			numNew := tt.clients.Count | ||||
| 			if numNew == 0 { | ||||
| @@ -339,41 +341,42 @@ func Test_multipleMonthsActivityClients_processMonth_segmented(t *testing.T) { | ||||
| // from 1 month ago and 2 months ago, and verifies that the correct clients are | ||||
| // added based on namespace, mount, and non-entity attributes | ||||
| func Test_multipleMonthsActivityClients_addRepeatedClients(t *testing.T) { | ||||
| 	core, _, _ := TestCoreUnsealed(t) | ||||
| 	now := time.Now().UTC() | ||||
|  | ||||
| 	m := newMultipleMonthsActivityClients(3) | ||||
| 	defaultMount := "default" | ||||
|  | ||||
| 	require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2}, "identity", nil, now)) | ||||
| 	require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2, Namespace: "other_ns"}, defaultMount, nil, now)) | ||||
| 	require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2}, defaultMount, nil, now)) | ||||
| 	require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2, ClientType: "non-entity"}, defaultMount, nil, now)) | ||||
| 	require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2}, "identity", nil, now, core)) | ||||
| 	require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2, Namespace: "other_ns"}, defaultMount, nil, now, core)) | ||||
| 	require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2}, defaultMount, nil, now, core)) | ||||
| 	require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2, ClientType: "non-entity"}, defaultMount, nil, now, core)) | ||||
|  | ||||
| 	month2Clients := m.months[2].clients | ||||
| 	month1Clients := m.months[1].clients | ||||
|  | ||||
| 	thisMonth := m.months[0] | ||||
| 	// this will match the first client in month 1 | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true}, defaultMount, nil)) | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true}, defaultMount, nil, core)) | ||||
| 	require.Contains(t, month1Clients, thisMonth.clients[0]) | ||||
|  | ||||
| 	// this will match the 3rd client in month 1 | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true, ClientType: "non-entity"}, defaultMount, nil)) | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true, ClientType: "non-entity"}, defaultMount, nil, core)) | ||||
| 	require.Equal(t, month1Clients[2], thisMonth.clients[1]) | ||||
|  | ||||
| 	// this will match the first two clients in month 1 | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 2, Repeated: true}, defaultMount, nil)) | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 2, Repeated: true}, defaultMount, nil, core)) | ||||
| 	require.Equal(t, month1Clients[0:2], thisMonth.clients[2:4]) | ||||
|  | ||||
| 	// this will match the first client in month 2 | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2}, "identity", nil)) | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2}, "identity", nil, core)) | ||||
| 	require.Equal(t, month2Clients[0], thisMonth.clients[4]) | ||||
|  | ||||
| 	// this will match the 3rd client in month 2 | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, defaultMount, nil)) | ||||
| 	require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, defaultMount, nil, core)) | ||||
| 	require.Equal(t, month2Clients[2], thisMonth.clients[5]) | ||||
|  | ||||
| 	require.Error(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, "other_mount", nil)) | ||||
| 	require.Error(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, "other_mount", nil, core)) | ||||
| } | ||||
|  | ||||
| // Test_singleMonthActivityClients_populateSegments calls populateSegments for a | ||||
| @@ -456,7 +459,7 @@ func Test_singleMonthActivityClients_populateSegments(t *testing.T) { | ||||
| 	for _, tc := range cases { | ||||
| 		t.Run(tc.name, func(t *testing.T) { | ||||
| 			s := singleMonthActivityClients{predefinedSegments: tc.segments, clients: clients, generationParameters: &generation.Data{EmptySegmentIndexes: tc.emptyIndexes, SkipSegmentIndexes: tc.skipIndexes, NumSegments: int32(tc.numSegments)}} | ||||
| 			gotSegments, err := s.populateSegments() | ||||
| 			gotSegments, err := s.populateSegments(s.predefinedSegments, s.clients) | ||||
| 			require.NoError(t, err) | ||||
| 			require.Equal(t, tc.wantSegments, gotSegments) | ||||
| 		}) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 divyaac
					divyaac