From 7a10a095a23c2e2ec52a1839a461f43cd57d09a5 Mon Sep 17 00:00:00 2001 From: miagilepner Date: Thu, 2 May 2024 16:27:10 +0200 Subject: [PATCH] Client count generation simplification, take 2 (#26781) * fix * actually works * use now for the intent log, and run pq concurrently --- .../logical_system_activity_write_testonly.go | 91 ++++++++++--------- ...cal_system_activity_write_testonly_test.go | 3 +- 2 files changed, 49 insertions(+), 45 deletions(-) diff --git a/vault/logical_system_activity_write_testonly.go b/vault/logical_system_activity_write_testonly.go index 62f7366887..40671595e7 100644 --- a/vault/logical_system_activity_write_testonly.go +++ b/vault/logical_system_activity_write_testonly.go @@ -85,6 +85,7 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo } paths, err := generated.write(ctx, opts, b.Core.activityLog) if err != nil { + b.logger.Debug("failed to write activity log data", "error", err.Error()) return logical.ErrorResponse("failed to write data"), err } return &logical.Response{ @@ -333,75 +334,77 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g return nil } +func (m *multipleMonthsActivityClients) addMissingCurrentMonth() { + missing := m.months[0].generationParameters == nil && + len(m.months) > 1 && + m.months[1].generationParameters != nil + if !missing { + return + } + m.months[0].generationParameters = &generation.Data{EmptySegmentIndexes: []int32{0}, NumSegments: 2} +} + +func (m *multipleMonthsActivityClients) timestampForMonth(i int, now time.Time) time.Time { + if i > 0 { + return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now)) + } + return now +} + func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) { now := time.Now().UTC() paths := []string{} _, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES] _, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS] - _, writeEntities := opts[generation.WriteOptions_WRITE_ENTITIES] _, writeIntentLog := opts[generation.WriteOptions_WRITE_INTENT_LOGS] - pqOpts := pqOptions{} - if writePQ || writeDistinctClients { - pqOpts.byNamespace = make(map[string]*processByNamespace) - pqOpts.byMonth = make(map[int64]*processMonth) - pqOpts.activePeriodEnd = m.latestTimestamp(now, true) - pqOpts.endTime = timeutil.EndOfMonth(m.latestTimestamp(pqOpts.activePeriodEnd, false)) - pqOpts.activePeriodStart = m.earliestTimestamp(now) - } + m.addMissingCurrentMonth() - var earliestTimestamp, latestTimestamp time.Time for i, month := range m.months { if month.generationParameters == nil { continue } - var timestamp time.Time - if i > 0 { - timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now)) - } else { - timestamp = now - } - if earliestTimestamp.IsZero() || timestamp.Before(earliestTimestamp) { - earliestTimestamp = timestamp - } - if timestamp.After(latestTimestamp) { - latestTimestamp = timestamp - } + timestamp := m.timestampForMonth(i, now) segments, err := month.populateSegments() if err != nil { return nil, err } for segmentIndex, segment := range segments { - if writeEntities || writeIntentLog { - if segment == nil { - // skip the index - continue - } - entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{ - startTimestamp: timestamp.Unix(), - currentClients: &activity.EntityActivityLog{Clients: segment}, - clientSequenceNumber: uint64(segmentIndex), - tokenCount: &activity.TokenCount{}, - }, true) - if err != nil { - return nil, err - } - paths = append(paths, entityPath) + if segment == nil { + // skip the index + continue } - } - - if (writePQ || writeDistinctClients) && i > 0 { - reader := newProtoSegmentReader(segments) - err = activityLog.segmentToPrecomputedQuery(ctx, timestamp, reader, pqOpts) + entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{ + startTimestamp: timestamp.Unix(), + currentClients: &activity.EntityActivityLog{Clients: segment}, + clientSequenceNumber: uint64(segmentIndex), + tokenCount: &activity.TokenCount{}, + }, true) if err != nil { return nil, err } + paths = append(paths, entityPath) } - + } + if writePQ || writeDistinctClients { + // start with the oldest month of data, and create precomputed queries + // up to that month + pqWg := sync.WaitGroup{} + for i := len(m.months) - 1; i > 0; i-- { + pqWg.Add(1) + go func(i int) { + defer pqWg.Done() + activityLog.precomputedQueryWorker(ctx, &ActivityIntentLog{ + PreviousMonth: m.timestampForMonth(i, now).Unix(), + NextMonth: now.Unix(), + }) + }(i) + } + pqWg.Wait() } if writeIntentLog { - err := activityLog.writeIntentLog(ctx, earliestTimestamp.UTC().Unix(), latestTimestamp.UTC()) + err := activityLog.writeIntentLog(ctx, m.latestTimestamp(now, false).Unix(), m.latestTimestamp(now, true).UTC()) if err != nil { return nil, err } diff --git a/vault/logical_system_activity_write_testonly_test.go b/vault/logical_system_activity_write_testonly_test.go index f3f2577354..e7291ed0bd 100644 --- a/vault/logical_system_activity_write_testonly_test.go +++ b/vault/logical_system_activity_write_testonly_test.go @@ -76,6 +76,7 @@ func TestSystemBackend_handleActivityWriteData(t *testing.T) { name: "correctly formatted data succeeds", operation: logical.UpdateOperation, input: map[string]interface{}{"input": `{"write":["WRITE_PRECOMPUTED_QUERIES"],"data":[{"current_month":true,"all":{"clients":[{"count":5}]}}]}`}, + wantPaths: 1, }, { name: "entities with multiple segments", @@ -643,7 +644,7 @@ func Test_handleActivityWriteData(t *testing.T) { next := time.Unix(intent.NextMonth, 0) require.Equal(t, timeutil.StartOfMonth(now), next.UTC()) - require.Equal(t, timeutil.StartOfMonth(timeutil.MonthsPreviousTo(3, now)), prev.UTC()) + require.Equal(t, timeutil.StartOfMonth(timeutil.MonthsPreviousTo(1, now)), prev.UTC()) times, err := core.activityLog.availableLogs(context.Background(), time.Now()) require.NoError(t, err)