mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-30 02:02:43 +00:00
VAULT-24582: ACME client count regeneration (#26366)
* do regeneration * fix test * fix regeneration * call precomputed query worker directly * cleanup * whoops, = instead of := * update regeneration intent log test * started testing * testing and fixes * add test for existing intent log, and update log levels * remove unrealistic test
This commit is contained in:
@@ -41,6 +41,7 @@ const (
|
||||
activityConfigKey = "config"
|
||||
activityIntentLogKey = "endofmonth"
|
||||
|
||||
activityACMERegenerationKey = "acme-regeneration"
|
||||
// sketch for each month that stores hash of client ids
|
||||
distinctClientsBasePath = "log/distinctclients/"
|
||||
|
||||
@@ -1108,15 +1109,15 @@ func (a *ActivityLog) queriesAvailable(ctx context.Context) (bool, error) {
|
||||
}
|
||||
|
||||
// setupActivityLog hooks up the singleton ActivityLog into Core.
|
||||
func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup) error {
|
||||
func (c *Core) setupActivityLog(ctx context.Context, wg *sync.WaitGroup, reload bool) error {
|
||||
c.activityLogLock.Lock()
|
||||
defer c.activityLogLock.Unlock()
|
||||
return c.setupActivityLogLocked(ctx, wg)
|
||||
return c.setupActivityLogLocked(ctx, wg, reload)
|
||||
}
|
||||
|
||||
// setupActivityLogLocked hooks up the singleton ActivityLog into Core.
|
||||
// this function should be called with activityLogLock.
|
||||
func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) error {
|
||||
func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup, reload bool) error {
|
||||
logger := c.baseLogger.Named("activity")
|
||||
c.AddLogger(logger)
|
||||
|
||||
@@ -1156,11 +1157,22 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) e
|
||||
go manager.activeFragmentWorker(ctx)
|
||||
}
|
||||
|
||||
// Check for any intent log, in the background
|
||||
doRegeneration := !reload && !manager.hasRegeneratedACME(ctx)
|
||||
manager.computationWorkerDone = make(chan struct{})
|
||||
// handle leftover intent logs and regenerating precomputed queries
|
||||
// for ACME
|
||||
go func() {
|
||||
manager.precomputedQueryWorker(ctx, nil)
|
||||
close(manager.computationWorkerDone)
|
||||
defer close(manager.computationWorkerDone)
|
||||
if doRegeneration {
|
||||
err := manager.regeneratePrecomputedQueries(ctx)
|
||||
if err != nil {
|
||||
manager.logger.Error("unable to regenerate ACME data", "error", err)
|
||||
}
|
||||
} else {
|
||||
// run the precomputed query worker normally
|
||||
// errors are logged within the function
|
||||
manager.precomputedQueryWorker(ctx, nil)
|
||||
}
|
||||
}()
|
||||
|
||||
// Catch up on garbage collection
|
||||
@@ -1175,6 +1187,64 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *ActivityLog) hasRegeneratedACME(ctx context.Context) bool {
|
||||
regenerated, err := a.view.Get(ctx, activityACMERegenerationKey)
|
||||
if err != nil {
|
||||
a.logger.Error("unable to access ACME regeneration key")
|
||||
return false
|
||||
}
|
||||
return regenerated != nil
|
||||
}
|
||||
|
||||
func (a *ActivityLog) writeRegeneratedACME(ctx context.Context) error {
|
||||
regeneratedEntry, err := logical.StorageEntryJSON(activityACMERegenerationKey, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return a.view.Put(ctx, regeneratedEntry)
|
||||
}
|
||||
|
||||
func (a *ActivityLog) regeneratePrecomputedQueries(ctx context.Context) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
a.l.RLock()
|
||||
doneCh := a.doneCh
|
||||
a.l.RUnlock()
|
||||
go func() {
|
||||
select {
|
||||
case <-doneCh:
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
break
|
||||
}
|
||||
}()
|
||||
|
||||
intentLogEntry, err := a.view.Get(ctx, activityIntentLogKey)
|
||||
if err != nil {
|
||||
a.logger.Error("could not load existing intent log", "error", err)
|
||||
}
|
||||
var intentLog *ActivityIntentLog
|
||||
if intentLogEntry == nil {
|
||||
regenerationIntentLog, err := a.createRegenerationIntentLog(ctx, a.clock.Now().UTC())
|
||||
if errors.Is(err, previousMonthNotFoundErr) {
|
||||
// if there are no segments earlier than the current month, consider
|
||||
// this a success
|
||||
return a.writeRegeneratedACME(ctx)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
intentLog = regenerationIntentLog
|
||||
a.logger.Debug("regenerating precomputed queries", "previous month", time.Unix(intentLog.PreviousMonth, 0).UTC(), "next month", time.Unix(intentLog.NextMonth, 0).UTC())
|
||||
}
|
||||
err = a.precomputedQueryWorker(ctx, intentLog)
|
||||
if err != nil && !errors.Is(err, previousMonthNotFoundErr) {
|
||||
return err
|
||||
}
|
||||
return a.writeRegeneratedACME(ctx)
|
||||
}
|
||||
|
||||
func (a *ActivityLog) createRegenerationIntentLog(ctx context.Context, now time.Time) (*ActivityIntentLog, error) {
|
||||
intentLog := &ActivityIntentLog{}
|
||||
segments, err := a.availableLogs(ctx, now)
|
||||
@@ -1190,12 +1260,12 @@ func (a *ActivityLog) createRegenerationIntentLog(ctx context.Context, now time.
|
||||
intentLog.PreviousMonth = segment.Unix()
|
||||
if i > 0 {
|
||||
intentLog.NextMonth = segments[i-1].Unix()
|
||||
break
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if intentLog.NextMonth == 0 || intentLog.PreviousMonth == 0 {
|
||||
return nil, fmt.Errorf("insufficient data to create a regeneration intent log")
|
||||
if intentLog.PreviousMonth == 0 {
|
||||
return nil, previousMonthNotFoundErr
|
||||
}
|
||||
|
||||
return intentLog, nil
|
||||
@@ -2431,6 +2501,8 @@ func (a *ActivityLog) reportPrecomputedQueryMetrics(ctx context.Context, segment
|
||||
}
|
||||
}
|
||||
|
||||
var previousMonthNotFoundErr = errors.New("previous month not found")
|
||||
|
||||
// goroutine to process the request in the intent log, creating precomputed queries.
|
||||
// We expect the return value won't be checked, so log errors as they occur
|
||||
// (but for unit testing having the error return should help.)
|
||||
@@ -2509,8 +2581,8 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context, intent *Activi
|
||||
}
|
||||
|
||||
lastMonth := intent.PreviousMonth
|
||||
lastMonthTime := time.Unix(lastMonth, 0)
|
||||
a.logger.Info("computing queries", "month", lastMonthTime.UTC())
|
||||
lastMonthTime := time.Unix(lastMonth, 0).UTC()
|
||||
a.logger.Info("computing queries", "month", lastMonthTime)
|
||||
|
||||
times, err := a.availableLogs(ctx, lastMonthTime)
|
||||
if err != nil {
|
||||
@@ -2520,12 +2592,12 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context, intent *Activi
|
||||
if len(times) == 0 {
|
||||
a.logger.Warn("no months in storage")
|
||||
cleanupIntentLog()
|
||||
return errors.New("previous month not found")
|
||||
return previousMonthNotFoundErr
|
||||
}
|
||||
if times[0].Unix() != lastMonth {
|
||||
a.logger.Warn("last month not in storage", "latest", times[0].Unix())
|
||||
cleanupIntentLog()
|
||||
return errors.New("previous month not found")
|
||||
return previousMonthNotFoundErr
|
||||
}
|
||||
|
||||
byNamespace := make(map[string]*processByNamespace)
|
||||
@@ -2549,7 +2621,7 @@ func (a *ActivityLog) precomputedQueryWorker(ctx context.Context, intent *Activi
|
||||
for _, startTime := range times {
|
||||
// Do not work back further than the current retention window,
|
||||
// which will just get deleted anyway.
|
||||
if startTime.Before(retentionWindow) {
|
||||
if startTime.Before(retentionWindow) && strictEnforcement {
|
||||
break
|
||||
}
|
||||
reader, err := a.NewSegmentFileReader(ctx, startTime)
|
||||
|
||||
@@ -690,7 +690,7 @@ func TestActivityLog_createRegenerationIntentLog(t *testing.T) {
|
||||
time.Date(2024, 1, 4, 10, 54, 12, 0, time.UTC),
|
||||
time.Date(2024, 1, 3, 10, 54, 12, 0, time.UTC),
|
||||
},
|
||||
&ActivityIntentLog{NextMonth: 1704365652, PreviousMonth: 1704279252},
|
||||
&ActivityIntentLog{NextMonth: 0, PreviousMonth: 1704365652},
|
||||
false,
|
||||
},
|
||||
{
|
||||
@@ -1565,7 +1565,7 @@ func TestActivityLog_StopAndRestart(t *testing.T) {
|
||||
// Simulate seal/unseal cycle
|
||||
core.stopActivityLog()
|
||||
var wg sync.WaitGroup
|
||||
core.setupActivityLog(ctx, &wg)
|
||||
core.setupActivityLog(ctx, &wg, false)
|
||||
wg.Wait()
|
||||
|
||||
a = core.activityLog
|
||||
|
||||
@@ -2440,7 +2440,7 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c
|
||||
if !c.IsDRSecondary() {
|
||||
// not waiting on wg to avoid changing existing behavior
|
||||
var wg sync.WaitGroup
|
||||
if err := c.setupActivityLog(ctx, &wg); err != nil {
|
||||
if err := c.setupActivityLog(ctx, &wg, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
199
vault/external_tests/activity_testonly/acme_regeneration_test.go
Normal file
199
vault/external_tests/activity_testonly/acme_regeneration_test.go
Normal file
@@ -0,0 +1,199 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
//go:build testonly
|
||||
|
||||
package activity_testonly
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/vault/helper/testhelpers"
|
||||
"github.com/hashicorp/vault/helper/testhelpers/minimal"
|
||||
"github.com/hashicorp/vault/helper/timeutil"
|
||||
"github.com/hashicorp/vault/sdk/helper/clientcountutil"
|
||||
"github.com/hashicorp/vault/sdk/helper/clientcountutil/generation"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func forceRegeneration(t *testing.T, cluster *vault.TestCluster) {
|
||||
t.Helper()
|
||||
client := cluster.Cores[0].Client
|
||||
_, err := client.Logical().Delete("sys/raw/sys/counters/activity/acme-regeneration")
|
||||
require.NoError(t, err)
|
||||
testhelpers.EnsureCoresSealed(t, cluster)
|
||||
testhelpers.EnsureCoresUnsealed(t, cluster)
|
||||
testhelpers.WaitForActiveNode(t, cluster)
|
||||
|
||||
testhelpers.RetryUntil(t, 10*time.Second, func() error {
|
||||
r, err := client.Logical().Read("sys/raw/sys/counters/activity/acme-regeneration")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if r == nil {
|
||||
return errors.New("no response")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// TestACMERegeneration_RegenerateWithCurrentMonth writes segments for previous
|
||||
// months and the current month. The test regenerates the precomputed queries,
|
||||
// and verifies that the counts are correct when querying both with and without
|
||||
// the current month
|
||||
func TestACMERegeneration_RegenerateWithCurrentMonth(t *testing.T) {
|
||||
t.Parallel()
|
||||
cluster := minimal.NewTestSoloCluster(t, &vault.CoreConfig{EnableRaw: true})
|
||||
client := cluster.Cores[0].Client
|
||||
_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{
|
||||
"enabled": "enable",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
now := time.Now().UTC()
|
||||
_, err = clientcountutil.NewActivityLogData(client).
|
||||
NewPreviousMonthData(3).
|
||||
// 3 months ago, 15 non-entity clients and 10 ACME clients
|
||||
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(10, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewPreviousMonthData(2).
|
||||
// 2 months ago, 7 new non-entity clients and 5 new ACME clients
|
||||
RepeatedClientsSeen(2, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(7, clientcountutil.WithClientType("non-entity-token")).
|
||||
RepeatedClientsSeen(5, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewClientsSeen(5, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewPreviousMonthData(1).
|
||||
// 1 months ago, 4 new non-entity clients and 2 new ACME clients
|
||||
RepeatedClientsSeen(3, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(4, clientcountutil.WithClientType("non-entity-token")).
|
||||
RepeatedClientsSeen(1, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewClientsSeen(2, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
|
||||
// current month, 10 new non-entity clients and 20 new ACME clients
|
||||
NewCurrentMonthData().
|
||||
NewClientsSeen(10, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(20, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
forceRegeneration(t, cluster)
|
||||
|
||||
// current month isn't included in this query
|
||||
resp, err := client.Logical().ReadWithData("sys/internal/counters/activity", map[string][]string{
|
||||
"start_time": {timeutil.StartOfMonth(timeutil.MonthsPreviousTo(5, now)).Format(time.RFC3339)},
|
||||
"end_time": {timeutil.EndOfMonth(timeutil.MonthsPreviousTo(1, now)).Format(time.RFC3339)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, vault.ResponseCounts{
|
||||
NonEntityTokens: 26,
|
||||
NonEntityClients: 26,
|
||||
Clients: 43,
|
||||
ACMEClients: 17,
|
||||
}, getTotals(t, resp))
|
||||
|
||||
// explicitly include the current month
|
||||
respWithCurrent, err := client.Logical().ReadWithData("sys/internal/counters/activity", map[string][]string{
|
||||
"start_time": {timeutil.StartOfMonth(timeutil.MonthsPreviousTo(5, now)).Format(time.RFC3339)},
|
||||
"end_time": {timeutil.EndOfMonth(now).Format(time.RFC3339)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, vault.ResponseCounts{
|
||||
NonEntityTokens: 36,
|
||||
NonEntityClients: 36,
|
||||
Clients: 73,
|
||||
ACMEClients: 37,
|
||||
}, getTotals(t, respWithCurrent))
|
||||
}
|
||||
|
||||
// TestACMERegeneration_RegenerateMuchOlder creates segments 5 months ago, 4
|
||||
// months ago, and 3 months ago. The test regenerates the precomputed queries
|
||||
// and then verifies that this older data is included in the generated results.
|
||||
func TestACMERegeneration_RegenerateMuchOlder(t *testing.T) {
|
||||
t.Parallel()
|
||||
cluster := minimal.NewTestSoloCluster(t, &vault.CoreConfig{EnableRaw: true})
|
||||
client := cluster.Cores[0].Client
|
||||
|
||||
now := time.Now().UTC()
|
||||
_, err := clientcountutil.NewActivityLogData(client).
|
||||
NewPreviousMonthData(5).
|
||||
// 5 months ago, 15 non-entity clients and 10 ACME clients
|
||||
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(10, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewPreviousMonthData(4).
|
||||
// 4 months ago, 7 new non-entity clients and 5 new ACME clients
|
||||
RepeatedClientsSeen(2, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(7, clientcountutil.WithClientType("non-entity-token")).
|
||||
RepeatedClientsSeen(5, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewClientsSeen(5, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewPreviousMonthData(3).
|
||||
// 3 months ago, 4 new non-entity clients and 2 new ACME clients
|
||||
RepeatedClientsSeen(3, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(4, clientcountutil.WithClientType("non-entity-token")).
|
||||
RepeatedClientsSeen(1, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewClientsSeen(2, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
forceRegeneration(t, cluster)
|
||||
resp, err := client.Logical().ReadWithData("sys/internal/counters/activity", map[string][]string{
|
||||
"start_time": {timeutil.StartOfMonth(timeutil.MonthsPreviousTo(5, now)).Format(time.RFC3339)},
|
||||
"end_time": {timeutil.EndOfMonth(now).Format(time.RFC3339)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, vault.ResponseCounts{
|
||||
NonEntityTokens: 26,
|
||||
NonEntityClients: 26,
|
||||
Clients: 43,
|
||||
ACMEClients: 17,
|
||||
}, getTotals(t, resp))
|
||||
}
|
||||
|
||||
// TestACMERegeneration_RegeneratePreviousMonths creates segments for the
|
||||
// previous 3 months, and no segments for the current month. The test verifies
|
||||
// that the older data gets regenerated
|
||||
func TestACMERegeneration_RegeneratePreviousMonths(t *testing.T) {
|
||||
t.Parallel()
|
||||
cluster := minimal.NewTestSoloCluster(t, &vault.CoreConfig{EnableRaw: true})
|
||||
client := cluster.Cores[0].Client
|
||||
|
||||
now := time.Now().UTC()
|
||||
_, err := clientcountutil.NewActivityLogData(client).
|
||||
NewPreviousMonthData(3).
|
||||
// 3 months ago, 15 non-entity clients and 10 ACME clients
|
||||
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(10, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewPreviousMonthData(2).
|
||||
// 2 months ago, 7 new non-entity clients and 5 new ACME clients
|
||||
RepeatedClientsSeen(2, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(7, clientcountutil.WithClientType("non-entity-token")).
|
||||
RepeatedClientsSeen(5, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewClientsSeen(5, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewPreviousMonthData(1).
|
||||
// 1 months ago, 4 new non-entity clients and 2 new ACME clients
|
||||
RepeatedClientsSeen(3, clientcountutil.WithClientType("non-entity-token")).
|
||||
NewClientsSeen(4, clientcountutil.WithClientType("non-entity-token")).
|
||||
RepeatedClientsSeen(1, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
NewClientsSeen(2, clientcountutil.WithClientType(vault.ACMEActivityType)).
|
||||
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
forceRegeneration(t, cluster)
|
||||
|
||||
resp, err := client.Logical().ReadWithData("sys/internal/counters/activity", map[string][]string{
|
||||
"start_time": {timeutil.StartOfMonth(timeutil.MonthsPreviousTo(5, now)).Format(time.RFC3339)},
|
||||
"end_time": {timeutil.EndOfMonth(now).Format(time.RFC3339)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, vault.ResponseCounts{
|
||||
NonEntityTokens: 26,
|
||||
NonEntityClients: 26,
|
||||
Clients: 43,
|
||||
ACMEClients: 17,
|
||||
}, getTotals(t, resp))
|
||||
}
|
||||
Reference in New Issue
Block a user