Revert "Client count generation simplification (#26692)" (#26736)

This reverts commit b84af55a20.
This commit is contained in:
miagilepner
2024-05-01 18:16:23 +02:00
committed by GitHub
parent c4839ad05c
commit 9e39a5f2a4
2 changed files with 37 additions and 29 deletions

View File

@@ -333,27 +333,35 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g
return nil return nil
} }
func (m *multipleMonthsActivityClients) timestampForMonth(i int, now time.Time) time.Time {
if i > 0 {
return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
}
return now
}
func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) { func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) {
now := time.Now().UTC() now := time.Now().UTC()
paths := []string{} paths := []string{}
_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES] _, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES]
_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS] _, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS]
_, writeEntities := opts[generation.WriteOptions_WRITE_ENTITIES]
_, writeIntentLog := opts[generation.WriteOptions_WRITE_INTENT_LOGS] _, writeIntentLog := opts[generation.WriteOptions_WRITE_INTENT_LOGS]
pqOpts := pqOptions{}
if writePQ || writeDistinctClients {
pqOpts.byNamespace = make(map[string]*processByNamespace)
pqOpts.byMonth = make(map[int64]*processMonth)
pqOpts.activePeriodEnd = m.latestTimestamp(now, true)
pqOpts.endTime = timeutil.EndOfMonth(m.latestTimestamp(pqOpts.activePeriodEnd, false))
pqOpts.activePeriodStart = m.earliestTimestamp(now)
}
var earliestTimestamp, latestTimestamp time.Time var earliestTimestamp, latestTimestamp time.Time
for i, month := range m.months { for i, month := range m.months {
if month.generationParameters == nil { if month.generationParameters == nil {
continue continue
} }
timestamp := m.timestampForMonth(i, now) var timestamp time.Time
if i > 0 {
timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
} else {
timestamp = now
}
if earliestTimestamp.IsZero() || timestamp.Before(earliestTimestamp) { if earliestTimestamp.IsZero() || timestamp.Before(earliestTimestamp) {
earliestTimestamp = timestamp earliestTimestamp = timestamp
} }
@@ -365,31 +373,32 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
return nil, err return nil, err
} }
for segmentIndex, segment := range segments { for segmentIndex, segment := range segments {
if segment == nil { if writeEntities || writeIntentLog {
// skip the index if segment == nil {
continue // skip the index
continue
}
entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{
startTimestamp: timestamp.Unix(),
currentClients: &activity.EntityActivityLog{Clients: segment},
clientSequenceNumber: uint64(segmentIndex),
tokenCount: &activity.TokenCount{},
}, true)
if err != nil {
return nil, err
}
paths = append(paths, entityPath)
} }
entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{ }
startTimestamp: timestamp.Unix(),
currentClients: &activity.EntityActivityLog{Clients: segment}, if (writePQ || writeDistinctClients) && i > 0 {
clientSequenceNumber: uint64(segmentIndex), reader := newProtoSegmentReader(segments)
tokenCount: &activity.TokenCount{}, err = activityLog.segmentToPrecomputedQuery(ctx, timestamp, reader, pqOpts)
}, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
paths = append(paths, entityPath)
}
}
if writePQ || writeDistinctClients {
// start with the oldest month of data, and create precomputed queries
// up to that month
for i := len(m.months) - 1; i > 0; i-- {
activityLog.precomputedQueryWorker(ctx, &ActivityIntentLog{
PreviousMonth: m.timestampForMonth(i, now).Unix(),
NextMonth: 0,
})
} }
} }
if writeIntentLog { if writeIntentLog {
err := activityLog.writeIntentLog(ctx, earliestTimestamp.UTC().Unix(), latestTimestamp.UTC()) err := activityLog.writeIntentLog(ctx, earliestTimestamp.UTC().Unix(), latestTimestamp.UTC())

View File

@@ -76,7 +76,6 @@ func TestSystemBackend_handleActivityWriteData(t *testing.T) {
name: "correctly formatted data succeeds", name: "correctly formatted data succeeds",
operation: logical.UpdateOperation, operation: logical.UpdateOperation,
input: map[string]interface{}{"input": `{"write":["WRITE_PRECOMPUTED_QUERIES"],"data":[{"current_month":true,"all":{"clients":[{"count":5}]}}]}`}, input: map[string]interface{}{"input": `{"write":["WRITE_PRECOMPUTED_QUERIES"],"data":[{"current_month":true,"all":{"clients":[{"count":5}]}}]}`},
wantPaths: 1,
}, },
{ {
name: "entities with multiple segments", name: "entities with multiple segments",