mirror of
				https://github.com/optim-enterprises-bv/vault.git
				synced 2025-11-03 20:17:59 +00:00 
			
		
		
		
	Query and Precompute Non-Contiguous Segments in the Activity Log (#15352)
* query and precompute non-contiguous segments in the activity log * changelog * newline formatting * make fmt * report listener and storage types as found keys * report listener and storage types as found keys * Update vault/activity_log_test.go Co-authored-by: Chris Capurso <1036769+ccapurso@users.noreply.github.com> * review comments * merge conflict * merge conflict * merge conflict * fix unchecked merge conflict Co-authored-by: Chris Capurso <1036769+ccapurso@users.noreply.github.com>
This commit is contained in:
		
							
								
								
									
										3
									
								
								changelog/15352.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								changelog/15352.txt
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,3 @@
 | 
				
			|||||||
 | 
					```release-note:improvement
 | 
				
			||||||
 | 
					core/activity: allow client counts to be precomputed and queried on non-contiguous chunks of data
 | 
				
			||||||
 | 
					```
 | 
				
			||||||
@@ -476,6 +476,16 @@ func testLoadConfigFile(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func testUnknownFieldValidationStorageAndListener(t *testing.T) {
 | 
				
			||||||
 | 
						config, err := LoadConfigFile("./test-fixtures/storage-listener-config.json")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("err: %s", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(config.UnusedKeys) != 0 {
 | 
				
			||||||
 | 
							t.Fatalf("unused keys for valid config are %+v\n", config.UnusedKeys)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func testUnknownFieldValidation(t *testing.T) {
 | 
					func testUnknownFieldValidation(t *testing.T) {
 | 
				
			||||||
	config, err := LoadConfigFile("./test-fixtures/config.hcl")
 | 
						config, err := LoadConfigFile("./test-fixtures/config.hcl")
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -525,16 +535,6 @@ func testUnknownFieldValidation(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func testUnknownFieldValidationStorageAndListener(t *testing.T) {
 | 
					 | 
				
			||||||
	config, err := LoadConfigFile("./test-fixtures/storage-listener-config.json")
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("err: %s", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if len(config.UnusedKeys) != 0 {
 | 
					 | 
				
			||||||
		t.Fatalf("unused keys for valid config are %+v\n", config.UnusedKeys)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func testLoadConfigFile_json(t *testing.T) {
 | 
					func testLoadConfigFile_json(t *testing.T) {
 | 
				
			||||||
	config, err := LoadConfigFile("./test-fixtures/config.hcl.json")
 | 
						config, err := LoadConfigFile("./test-fixtures/config.hcl.json")
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -134,6 +134,30 @@ func (s *PrecomputedQueryStore) listEndTimes(ctx context.Context, startTime time
 | 
				
			|||||||
	return endTimes, nil
 | 
						return endTimes, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *PrecomputedQueryStore) getMaxEndTime(ctx context.Context, startTime time.Time, endTimeBound time.Time) (time.Time, error) {
 | 
				
			||||||
 | 
						rawEndTimes, err := s.view.List(ctx, fmt.Sprintf("%v/", startTime.Unix()))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return time.Time{}, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						maxEndTime := time.Time{}
 | 
				
			||||||
 | 
						for _, raw := range rawEndTimes {
 | 
				
			||||||
 | 
							val, err := strconv.ParseInt(raw, 10, 64)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								s.logger.Warn("could not parse precomputed query end time", "key", raw)
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							endTime := time.Unix(val, 0).UTC()
 | 
				
			||||||
 | 
							s.logger.Trace("end time in consideration is", "end time", endTime, "end time bound", endTimeBound)
 | 
				
			||||||
 | 
							if endTime.After(maxEndTime) && !endTime.After(endTimeBound) {
 | 
				
			||||||
 | 
								s.logger.Trace("end time has been updated")
 | 
				
			||||||
 | 
								maxEndTime = endTime
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return maxEndTime, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *PrecomputedQueryStore) QueriesAvailable(ctx context.Context) (bool, error) {
 | 
					func (s *PrecomputedQueryStore) QueriesAvailable(ctx context.Context) (bool, error) {
 | 
				
			||||||
	startTimes, err := s.listStartTimes(ctx)
 | 
						startTimes, err := s.listStartTimes(ctx)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -181,7 +205,7 @@ func (s *PrecomputedQueryStore) Get(ctx context.Context, startTime, endTime time
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	s.logger.Trace("retrieved start times from storage", "startTimes", startTimes)
 | 
						s.logger.Trace("retrieved start times from storage", "startTimes", startTimes)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	filteredList := make([]time.Time, 0, len(startTimes))
 | 
						filteredList := make([]time.Time, 0)
 | 
				
			||||||
	for _, t := range startTimes {
 | 
						for _, t := range startTimes {
 | 
				
			||||||
		if timeutil.InRange(t, startTime, endTime) {
 | 
							if timeutil.InRange(t, startTime, endTime) {
 | 
				
			||||||
			filteredList = append(filteredList, t)
 | 
								filteredList = append(filteredList, t)
 | 
				
			||||||
@@ -196,42 +220,45 @@ func (s *PrecomputedQueryStore) Get(ctx context.Context, startTime, endTime time
 | 
				
			|||||||
	sort.Slice(filteredList, func(i, j int) bool {
 | 
						sort.Slice(filteredList, func(i, j int) bool {
 | 
				
			||||||
		return filteredList[i].After(filteredList[j])
 | 
							return filteredList[i].After(filteredList[j])
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	contiguous := timeutil.GetMostRecentContiguousMonths(filteredList)
 | 
					 | 
				
			||||||
	actualStartTime := contiguous[len(contiguous)-1]
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s.logger.Trace("chose start time", "actualStartTime", actualStartTime, "contiguous", contiguous)
 | 
						closestStartTime := time.Time{}
 | 
				
			||||||
 | 
						closestEndTime := time.Time{}
 | 
				
			||||||
	endTimes, err := s.listEndTimes(ctx, actualStartTime)
 | 
						maxTimeDifference := time.Duration(0)
 | 
				
			||||||
 | 
						for i := len(filteredList) - 1; i >= 0; i-- {
 | 
				
			||||||
 | 
							testStartTime := filteredList[i]
 | 
				
			||||||
 | 
							s.logger.Trace("trying test start times", "startTime", testStartTime, "filteredList", filteredList)
 | 
				
			||||||
 | 
							testEndTime, err := s.getMaxEndTime(ctx, testStartTime, endTime)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	s.logger.Trace("retrieved end times from storage", "endTimes", endTimes)
 | 
							if testEndTime.IsZero() {
 | 
				
			||||||
 | 
					 | 
				
			||||||
			// Might happen if there's a race with GC
 | 
								// Might happen if there's a race with GC
 | 
				
			||||||
	if len(endTimes) == 0 {
 | 
								s.logger.Warn("missing end times", "start time", testStartTime)
 | 
				
			||||||
		s.logger.Warn("missing end times", "start time", actualStartTime)
 | 
								continue
 | 
				
			||||||
		return nil, nil
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	var actualEndTime time.Time
 | 
							s.logger.Trace("retrieved max end time from storage", "endTime", testEndTime)
 | 
				
			||||||
	for _, t := range endTimes {
 | 
							diff := testEndTime.Sub(testStartTime)
 | 
				
			||||||
		if timeutil.InRange(t, startTime, endTime) {
 | 
							if diff >= maxTimeDifference {
 | 
				
			||||||
			if actualEndTime.IsZero() || t.After(actualEndTime) {
 | 
								closestStartTime = testStartTime
 | 
				
			||||||
				actualEndTime = t
 | 
								closestEndTime = testEndTime
 | 
				
			||||||
 | 
								maxTimeDifference = diff
 | 
				
			||||||
 | 
								s.logger.Trace("updating closest times")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	}
 | 
						s.logger.Trace("chose start end end times", "startTime", closestStartTime, "endTime")
 | 
				
			||||||
	if actualEndTime.IsZero() {
 | 
					
 | 
				
			||||||
		s.logger.Warn("no end time in range", "start time", actualStartTime)
 | 
						if closestStartTime.IsZero() || closestEndTime.IsZero() {
 | 
				
			||||||
 | 
							s.logger.Warn("no start or end time in range", "start time", closestStartTime, "end time", closestEndTime)
 | 
				
			||||||
		return nil, nil
 | 
							return nil, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	path := fmt.Sprintf("%v/%v", actualStartTime.Unix(), actualEndTime.Unix())
 | 
						path := fmt.Sprintf("%v/%v", closestStartTime.Unix(), closestEndTime.Unix())
 | 
				
			||||||
	entry, err := s.view.Get(ctx, path)
 | 
						entry, err := s.view.Get(ctx, path)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if entry == nil {
 | 
						if entry == nil {
 | 
				
			||||||
		s.logger.Warn("no end time entry found", "start time", actualStartTime, "end time", actualEndTime)
 | 
							s.logger.Warn("no end time entry found", "start time", closestStartTime, "end time", closestEndTime)
 | 
				
			||||||
		return nil, nil
 | 
							return nil, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -483,6 +483,23 @@ func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context) ([]ti
 | 
				
			|||||||
	return timeutil.GetMostRecentContiguousMonths(logTimes), nil
 | 
						return timeutil.GetMostRecentContiguousMonths(logTimes), nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getMostRecentActivityLogSegment gets the times (in UTC) associated with the most recent
 | 
				
			||||||
 | 
					// contiguous set of activity logs, sorted in decreasing order (latest to earliest)
 | 
				
			||||||
 | 
					func (a *ActivityLog) getMostRecentNonContiguousActivityLogSegments(ctx context.Context) ([]time.Time, error) {
 | 
				
			||||||
 | 
						logTimes, err := a.availableLogs(ctx)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(logTimes) <= 12 {
 | 
				
			||||||
 | 
							return logTimes, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						contiguousMonths := timeutil.GetMostRecentContiguousMonths(logTimes)
 | 
				
			||||||
 | 
						if len(contiguousMonths) >= 12 {
 | 
				
			||||||
 | 
							return contiguousMonths, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return logTimes[:12], nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists
 | 
					// getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists
 | 
				
			||||||
func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, bool, error) {
 | 
					func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, bool, error) {
 | 
				
			||||||
	p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
 | 
						p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
 | 
				
			||||||
@@ -2015,7 +2032,7 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context) error {
 | 
				
			|||||||
	lastMonth := intent.PreviousMonth
 | 
						lastMonth := intent.PreviousMonth
 | 
				
			||||||
	a.logger.Info("computing queries", "month", time.Unix(lastMonth, 0).UTC())
 | 
						a.logger.Info("computing queries", "month", time.Unix(lastMonth, 0).UTC())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	times, err := a.getMostRecentActivityLogSegment(ctx)
 | 
						times, err := a.getMostRecentNonContiguousActivityLogSegments(ctx)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		a.logger.Warn("could not list recent segments", "error", err)
 | 
							a.logger.Warn("could not list recent segments", "error", err)
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2357,11 +2357,11 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) {
 | 
				
			|||||||
			"deleted-ccccc",
 | 
								"deleted-ccccc",
 | 
				
			||||||
			5.0,
 | 
								5.0,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		// august-september values
 | 
							// january-september values
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			"identity.nonentity.active.reporting_period",
 | 
								"identity.nonentity.active.reporting_period",
 | 
				
			||||||
			"root",
 | 
								"root",
 | 
				
			||||||
			1220.0,
 | 
								1223.0,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			"identity.nonentity.active.reporting_period",
 | 
								"identity.nonentity.active.reporting_period",
 | 
				
			||||||
@@ -2399,7 +2399,7 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if !found {
 | 
							if !found {
 | 
				
			||||||
			t.Errorf("No guage found for %v %v",
 | 
								t.Errorf("No gauge found for %v %v",
 | 
				
			||||||
				g.Name, g.NamespaceLabel)
 | 
									g.Name, g.NamespaceLabel)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -2776,6 +2776,187 @@ func TestActivityLog_Precompute(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestActivityLog_Precompute_SkipMonth will put two non-contiguous chunks of
 | 
				
			||||||
 | 
					// data in the activity log, and then run precomputedQueryWorker. Finally it
 | 
				
			||||||
 | 
					// will perform a query get over the skip month and expect a query for the entire
 | 
				
			||||||
 | 
					// time segment (non-contiguous)
 | 
				
			||||||
 | 
					func TestActivityLog_Precompute_SkipMonth(t *testing.T) {
 | 
				
			||||||
 | 
						timeutil.SkipAtEndOfMonth(t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						august := time.Date(2020, 8, 15, 12, 0, 0, 0, time.UTC)
 | 
				
			||||||
 | 
						september := timeutil.StartOfMonth(time.Date(2020, 9, 1, 0, 0, 0, 0, time.UTC))
 | 
				
			||||||
 | 
						october := timeutil.StartOfMonth(time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC))
 | 
				
			||||||
 | 
						november := timeutil.StartOfMonth(time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC))
 | 
				
			||||||
 | 
						december := timeutil.StartOfMonth(time.Date(2020, 12, 1, 0, 0, 0, 0, time.UTC))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						core, _, _, _ := TestCoreUnsealedWithMetrics(t)
 | 
				
			||||||
 | 
						a := core.activityLog
 | 
				
			||||||
 | 
						ctx := namespace.RootContext(nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						entityRecords := make([]*activity.EntityRecord, 45)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i := range entityRecords {
 | 
				
			||||||
 | 
							entityRecords[i] = &activity.EntityRecord{
 | 
				
			||||||
 | 
								ClientID:    fmt.Sprintf("111122222-3333-4444-5555-%012v", i),
 | 
				
			||||||
 | 
								NamespaceID: "root",
 | 
				
			||||||
 | 
								Timestamp:   time.Now().Unix(),
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						toInsert := []struct {
 | 
				
			||||||
 | 
							StartTime int64
 | 
				
			||||||
 | 
							Segment   uint64
 | 
				
			||||||
 | 
							Clients   []*activity.EntityRecord
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								august.Unix(),
 | 
				
			||||||
 | 
								0,
 | 
				
			||||||
 | 
								entityRecords[:20],
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								september.Unix(),
 | 
				
			||||||
 | 
								0,
 | 
				
			||||||
 | 
								entityRecords[20:30],
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								november.Unix(),
 | 
				
			||||||
 | 
								0,
 | 
				
			||||||
 | 
								entityRecords[30:45],
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Note that precomputedQuery worker doesn't filter
 | 
				
			||||||
 | 
						// for times <= the one it was asked to do. Is that a problem?
 | 
				
			||||||
 | 
						// Here, it means that we can't insert everything *first* and do multiple
 | 
				
			||||||
 | 
						// test cases, we have to write logs incrementally.
 | 
				
			||||||
 | 
						doInsert := func(i int) {
 | 
				
			||||||
 | 
							t.Helper()
 | 
				
			||||||
 | 
							segment := toInsert[i]
 | 
				
			||||||
 | 
							eal := &activity.EntityActivityLog{
 | 
				
			||||||
 | 
								Clients: segment.Clients,
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							data, err := proto.Marshal(eal)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatal(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							path := fmt.Sprintf("%ventity/%v/%v", ActivityLogPrefix, segment.StartTime, segment.Segment)
 | 
				
			||||||
 | 
							WriteToStorage(t, core, path, data)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						expectedCounts := []struct {
 | 
				
			||||||
 | 
							StartTime   time.Time
 | 
				
			||||||
 | 
							EndTime     time.Time
 | 
				
			||||||
 | 
							ByNamespace map[string]int
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							// First test case
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								august,
 | 
				
			||||||
 | 
								timeutil.EndOfMonth(september),
 | 
				
			||||||
 | 
								map[string]int{
 | 
				
			||||||
 | 
									"root": 30,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							// Second test case
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								august,
 | 
				
			||||||
 | 
								timeutil.EndOfMonth(november),
 | 
				
			||||||
 | 
								map[string]int{
 | 
				
			||||||
 | 
									"root": 45,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						checkPrecomputedQuery := func(i int) {
 | 
				
			||||||
 | 
							t.Helper()
 | 
				
			||||||
 | 
							pq, err := a.queryStore.Get(ctx, expectedCounts[i].StartTime, expectedCounts[i].EndTime)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatal(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if pq == nil {
 | 
				
			||||||
 | 
								t.Errorf("empty result for %v -- %v", expectedCounts[i].StartTime, expectedCounts[i].EndTime)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if len(pq.Namespaces) != len(expectedCounts[i].ByNamespace) {
 | 
				
			||||||
 | 
								t.Errorf("mismatched number of namespaces, expected %v got %v",
 | 
				
			||||||
 | 
									len(expectedCounts[i].ByNamespace), len(pq.Namespaces))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							for _, nsRecord := range pq.Namespaces {
 | 
				
			||||||
 | 
								val, ok := expectedCounts[i].ByNamespace[nsRecord.NamespaceID]
 | 
				
			||||||
 | 
								if !ok {
 | 
				
			||||||
 | 
									t.Errorf("unexpected namespace %v", nsRecord.NamespaceID)
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if uint64(val) != nsRecord.Entities {
 | 
				
			||||||
 | 
									t.Errorf("wrong number of entities in %v: expected %v, got %v",
 | 
				
			||||||
 | 
										nsRecord.NamespaceID, val, nsRecord.Entities)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if !pq.StartTime.Equal(expectedCounts[i].StartTime) {
 | 
				
			||||||
 | 
								t.Errorf("mismatched start time: expected %v got %v",
 | 
				
			||||||
 | 
									expectedCounts[i].StartTime, pq.StartTime)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if !pq.EndTime.Equal(expectedCounts[i].EndTime) {
 | 
				
			||||||
 | 
								t.Errorf("mismatched end time: expected %v got %v",
 | 
				
			||||||
 | 
									expectedCounts[i].EndTime, pq.EndTime)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						testCases := []struct {
 | 
				
			||||||
 | 
							InsertUpTo   int // index in the toInsert array
 | 
				
			||||||
 | 
							PrevMonth    int64
 | 
				
			||||||
 | 
							NextMonth    int64
 | 
				
			||||||
 | 
							ExpectedUpTo int // index in the expectedCounts array
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								1,
 | 
				
			||||||
 | 
								september.Unix(),
 | 
				
			||||||
 | 
								october.Unix(),
 | 
				
			||||||
 | 
								0,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								2,
 | 
				
			||||||
 | 
								november.Unix(),
 | 
				
			||||||
 | 
								december.Unix(),
 | 
				
			||||||
 | 
								1,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						inserted := -1
 | 
				
			||||||
 | 
						for _, tc := range testCases {
 | 
				
			||||||
 | 
							t.Logf("tc %+v", tc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Persists across loops
 | 
				
			||||||
 | 
							for inserted < tc.InsertUpTo {
 | 
				
			||||||
 | 
								inserted += 1
 | 
				
			||||||
 | 
								t.Logf("inserting segment %v", inserted)
 | 
				
			||||||
 | 
								doInsert(inserted)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							intent := &ActivityIntentLog{
 | 
				
			||||||
 | 
								PreviousMonth: tc.PrevMonth,
 | 
				
			||||||
 | 
								NextMonth:     tc.NextMonth,
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							data, err := json.Marshal(intent)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatal(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							WriteToStorage(t, core, "sys/counters/activity/endofmonth", data)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Pretend we've successfully rolled over to the following month
 | 
				
			||||||
 | 
							a.SetStartTimestamp(tc.NextMonth)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							err = a.precomputedQueryWorker(ctx)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatal(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							expectMissingSegment(t, core, "sys/counters/activity/endofmonth")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							for i := 0; i <= tc.ExpectedUpTo; i++ {
 | 
				
			||||||
 | 
								checkPrecomputedQuery(i)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TestActivityLog_PrecomputeNonEntityTokensWithID is the same test as
 | 
					// TestActivityLog_PrecomputeNonEntityTokensWithID is the same test as
 | 
				
			||||||
// TestActivityLog_Precompute, except all the clients are tokens without
 | 
					// TestActivityLog_Precompute, except all the clients are tokens without
 | 
				
			||||||
// entities. This ensures the deduplication logic and separation logic between
 | 
					// entities. This ensures the deduplication logic and separation logic between
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user