mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-02 03:27:54 +00:00
Client count generation simplification, take 2 (#26781)
* fix * actually works * use now for the intent log, and run pq concurrently
This commit is contained in:
@@ -85,6 +85,7 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo
|
|||||||
}
|
}
|
||||||
paths, err := generated.write(ctx, opts, b.Core.activityLog)
|
paths, err := generated.write(ctx, opts, b.Core.activityLog)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.logger.Debug("failed to write activity log data", "error", err.Error())
|
||||||
return logical.ErrorResponse("failed to write data"), err
|
return logical.ErrorResponse("failed to write data"), err
|
||||||
}
|
}
|
||||||
return &logical.Response{
|
return &logical.Response{
|
||||||
@@ -333,75 +334,77 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *multipleMonthsActivityClients) addMissingCurrentMonth() {
|
||||||
|
missing := m.months[0].generationParameters == nil &&
|
||||||
|
len(m.months) > 1 &&
|
||||||
|
m.months[1].generationParameters != nil
|
||||||
|
if !missing {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.months[0].generationParameters = &generation.Data{EmptySegmentIndexes: []int32{0}, NumSegments: 2}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *multipleMonthsActivityClients) timestampForMonth(i int, now time.Time) time.Time {
|
||||||
|
if i > 0 {
|
||||||
|
return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
|
||||||
|
}
|
||||||
|
return now
|
||||||
|
}
|
||||||
|
|
||||||
func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) {
|
func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) {
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
paths := []string{}
|
paths := []string{}
|
||||||
|
|
||||||
_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES]
|
_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES]
|
||||||
_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS]
|
_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS]
|
||||||
_, writeEntities := opts[generation.WriteOptions_WRITE_ENTITIES]
|
|
||||||
_, writeIntentLog := opts[generation.WriteOptions_WRITE_INTENT_LOGS]
|
_, writeIntentLog := opts[generation.WriteOptions_WRITE_INTENT_LOGS]
|
||||||
|
|
||||||
pqOpts := pqOptions{}
|
m.addMissingCurrentMonth()
|
||||||
if writePQ || writeDistinctClients {
|
|
||||||
pqOpts.byNamespace = make(map[string]*processByNamespace)
|
|
||||||
pqOpts.byMonth = make(map[int64]*processMonth)
|
|
||||||
pqOpts.activePeriodEnd = m.latestTimestamp(now, true)
|
|
||||||
pqOpts.endTime = timeutil.EndOfMonth(m.latestTimestamp(pqOpts.activePeriodEnd, false))
|
|
||||||
pqOpts.activePeriodStart = m.earliestTimestamp(now)
|
|
||||||
}
|
|
||||||
|
|
||||||
var earliestTimestamp, latestTimestamp time.Time
|
|
||||||
for i, month := range m.months {
|
for i, month := range m.months {
|
||||||
if month.generationParameters == nil {
|
if month.generationParameters == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var timestamp time.Time
|
timestamp := m.timestampForMonth(i, now)
|
||||||
if i > 0 {
|
|
||||||
timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
|
|
||||||
} else {
|
|
||||||
timestamp = now
|
|
||||||
}
|
|
||||||
if earliestTimestamp.IsZero() || timestamp.Before(earliestTimestamp) {
|
|
||||||
earliestTimestamp = timestamp
|
|
||||||
}
|
|
||||||
if timestamp.After(latestTimestamp) {
|
|
||||||
latestTimestamp = timestamp
|
|
||||||
}
|
|
||||||
segments, err := month.populateSegments()
|
segments, err := month.populateSegments()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for segmentIndex, segment := range segments {
|
for segmentIndex, segment := range segments {
|
||||||
if writeEntities || writeIntentLog {
|
if segment == nil {
|
||||||
if segment == nil {
|
// skip the index
|
||||||
// skip the index
|
continue
|
||||||
continue
|
|
||||||
}
|
|
||||||
entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{
|
|
||||||
startTimestamp: timestamp.Unix(),
|
|
||||||
currentClients: &activity.EntityActivityLog{Clients: segment},
|
|
||||||
clientSequenceNumber: uint64(segmentIndex),
|
|
||||||
tokenCount: &activity.TokenCount{},
|
|
||||||
}, true)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
paths = append(paths, entityPath)
|
|
||||||
}
|
}
|
||||||
}
|
entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{
|
||||||
|
startTimestamp: timestamp.Unix(),
|
||||||
if (writePQ || writeDistinctClients) && i > 0 {
|
currentClients: &activity.EntityActivityLog{Clients: segment},
|
||||||
reader := newProtoSegmentReader(segments)
|
clientSequenceNumber: uint64(segmentIndex),
|
||||||
err = activityLog.segmentToPrecomputedQuery(ctx, timestamp, reader, pqOpts)
|
tokenCount: &activity.TokenCount{},
|
||||||
|
}, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
paths = append(paths, entityPath)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if writePQ || writeDistinctClients {
|
||||||
|
// start with the oldest month of data, and create precomputed queries
|
||||||
|
// up to that month
|
||||||
|
pqWg := sync.WaitGroup{}
|
||||||
|
for i := len(m.months) - 1; i > 0; i-- {
|
||||||
|
pqWg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer pqWg.Done()
|
||||||
|
activityLog.precomputedQueryWorker(ctx, &ActivityIntentLog{
|
||||||
|
PreviousMonth: m.timestampForMonth(i, now).Unix(),
|
||||||
|
NextMonth: now.Unix(),
|
||||||
|
})
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
pqWg.Wait()
|
||||||
}
|
}
|
||||||
if writeIntentLog {
|
if writeIntentLog {
|
||||||
err := activityLog.writeIntentLog(ctx, earliestTimestamp.UTC().Unix(), latestTimestamp.UTC())
|
err := activityLog.writeIntentLog(ctx, m.latestTimestamp(now, false).Unix(), m.latestTimestamp(now, true).UTC())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -76,6 +76,7 @@ func TestSystemBackend_handleActivityWriteData(t *testing.T) {
|
|||||||
name: "correctly formatted data succeeds",
|
name: "correctly formatted data succeeds",
|
||||||
operation: logical.UpdateOperation,
|
operation: logical.UpdateOperation,
|
||||||
input: map[string]interface{}{"input": `{"write":["WRITE_PRECOMPUTED_QUERIES"],"data":[{"current_month":true,"all":{"clients":[{"count":5}]}}]}`},
|
input: map[string]interface{}{"input": `{"write":["WRITE_PRECOMPUTED_QUERIES"],"data":[{"current_month":true,"all":{"clients":[{"count":5}]}}]}`},
|
||||||
|
wantPaths: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "entities with multiple segments",
|
name: "entities with multiple segments",
|
||||||
@@ -643,7 +644,7 @@ func Test_handleActivityWriteData(t *testing.T) {
|
|||||||
next := time.Unix(intent.NextMonth, 0)
|
next := time.Unix(intent.NextMonth, 0)
|
||||||
|
|
||||||
require.Equal(t, timeutil.StartOfMonth(now), next.UTC())
|
require.Equal(t, timeutil.StartOfMonth(now), next.UTC())
|
||||||
require.Equal(t, timeutil.StartOfMonth(timeutil.MonthsPreviousTo(3, now)), prev.UTC())
|
require.Equal(t, timeutil.StartOfMonth(timeutil.MonthsPreviousTo(1, now)), prev.UTC())
|
||||||
|
|
||||||
times, err := core.activityLog.availableLogs(context.Background(), time.Now())
|
times, err := core.activityLog.availableLogs(context.Background(), time.Now())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|||||||
Reference in New Issue
Block a user