Revert "Add tokens to local path (#28722)" (#29294)

This reverts commit 435ecc63ba.
This commit is contained in:
divyaac
2025-01-06 10:36:23 -08:00
committed by GitHub
parent d2e902629d
commit ac42bf718b
4 changed files with 33 additions and 46 deletions

View File

@@ -36,22 +36,20 @@ import (
const ( const (
// activitySubPath is the directory under the system view where // activitySubPath is the directory under the system view where
// the log will be stored. // the log will be stored.
activitySubPath = "counters/activity/" activitySubPath = "counters/activity/"
activityEntityBasePath = "log/entity/" activityEntityBasePath = "log/entity/"
activityTokenBasePath = "log/directtokens/" activityTokenBasePath = "log/directtokens/"
activityTokenLocalBasePath = "local/" + activityTokenBasePath activityQueryBasePath = "queries/"
activityQueryBasePath = "queries/" activityConfigKey = "config"
activityConfigKey = "config" activityIntentLogKey = "endofmonth"
activityIntentLogKey = "endofmonth"
activityACMERegenerationKey = "acme-regeneration" activityACMERegenerationKey = "acme-regeneration"
// sketch for each month that stores hash of client ids // sketch for each month that stores hash of client ids
distinctClientsBasePath = "log/distinctclients/" distinctClientsBasePath = "log/distinctclients/"
// for testing purposes (public as needed) // for testing purposes (public as needed)
ActivityLogPrefix = "sys/counters/activity/log/" ActivityLogPrefix = "sys/counters/activity/log/"
ActivityLogLocalPrefix = "sys/counters/activity/local/log/" ActivityPrefix = "sys/counters/activity/"
ActivityPrefix = "sys/counters/activity/"
// Time to wait on perf standby before sending fragment // Time to wait on perf standby before sending fragment
activityFragmentStandbyTime = 10 * time.Minute activityFragmentStandbyTime = 10 * time.Minute
@@ -506,7 +504,7 @@ func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegm
return "", nil return "", nil
} }
// RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed // RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed
tokenPath := fmt.Sprintf("%s%d/0", activityTokenLocalBasePath, currentSegment.startTimestamp) tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, currentSegment.startTimestamp)
// We must still allow for the tokenCount of the current segment to // We must still allow for the tokenCount of the current segment to
// be written to storage, since if we remove this code we will incur // be written to storage, since if we remove this code we will incur
// data loss for one segment's worth of TWEs. // data loss for one segment's worth of TWEs.
@@ -588,7 +586,7 @@ func parseSegmentNumberFromPath(path string) (int, bool) {
// sorted last to first // sorted last to first
func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) { func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) {
paths := make([]string, 0) paths := make([]string, 0)
for _, basePath := range []string{activityEntityBasePath, activityTokenLocalBasePath} { for _, basePath := range []string{activityEntityBasePath, activityTokenBasePath} {
p, err := a.view.List(ctx, basePath) p, err := a.view.List(ctx, basePath)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -696,7 +694,7 @@ func (a *ActivityLog) WalkTokenSegments(ctx context.Context,
startTime time.Time, startTime time.Time,
walkFn func(*activity.TokenCount), walkFn func(*activity.TokenCount),
) error { ) error {
basePath := activityTokenLocalBasePath + fmt.Sprint(startTime.Unix()) + "/" basePath := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/"
pathList, err := a.view.List(ctx, basePath) pathList, err := a.view.List(ctx, basePath)
if err != nil { if err != nil {
return err return err
@@ -797,7 +795,7 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti
// tokenCountExists checks if there's a token log for :startTime: // tokenCountExists checks if there's a token log for :startTime:
// this function should be called with the lock held // this function should be called with the lock held
func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) (bool, error) { func (a *ActivityLog) tokenCountExists(ctx context.Context, startTime time.Time) (bool, error) {
p, err := a.view.List(ctx, activityTokenLocalBasePath+fmt.Sprint(startTime.Unix())+"/") p, err := a.view.List(ctx, activityTokenBasePath+fmt.Sprint(startTime.Unix())+"/")
if err != nil { if err != nil {
return false, err return false, err
} }
@@ -822,7 +820,7 @@ func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) e
return nil return nil
} }
path := activityTokenLocalBasePath + fmt.Sprint(startTime.Unix()) + "/0" path := activityTokenBasePath + fmt.Sprint(startTime.Unix()) + "/0"
data, err := a.view.Get(ctx, path) data, err := a.view.Get(ctx, path)
if err != nil { if err != nil {
return err return err
@@ -918,7 +916,7 @@ func (a *ActivityLog) resetCurrentLog() {
func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) { func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) {
entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp) entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp)
tokenPath := fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp) tokenPath := fmt.Sprintf("%v%v/", activityTokenBasePath, startTimestamp)
entitySegments, err := a.view.List(ctx, entityPath) entitySegments, err := a.view.List(ctx, entityPath)
if err != nil { if err != nil {

View File

@@ -297,7 +297,7 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) {
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
nsIDs := [...]string{"ns1_id", "ns2_id", "ns3_id"} nsIDs := [...]string{"ns1_id", "ns2_id", "ns3_id"}
path := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogLocalPrefix, a.GetStartTimestamp()) path := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogPrefix, a.GetStartTimestamp())
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
a.AddTokenToFragment(nsIDs[0]) a.AddTokenToFragment(nsIDs[0])
@@ -380,7 +380,7 @@ func TestActivityLog_SaveTokensToStorageDoesNotUpdateTokenCount(t *testing.T) {
a.SetStandbyEnable(ctx, true) a.SetStandbyEnable(ctx, true)
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
tokenPath := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogLocalPrefix, a.GetStartTimestamp()) tokenPath := fmt.Sprintf("%sdirecttokens/%d/0", ActivityLogPrefix, a.GetStartTimestamp())
clientPath := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", a.GetStartTimestamp()) clientPath := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", a.GetStartTimestamp())
// Create some entries without entityIDs // Create some entries without entityIDs
tokenEntryOne := logical.TokenEntry{NamespaceID: namespace.RootNamespaceID, Policies: []string{"hi"}} tokenEntryOne := logical.TokenEntry{NamespaceID: namespace.RootNamespaceID, Policies: []string{"hi"}}
@@ -637,18 +637,13 @@ func TestActivityLog_availableLogs(t *testing.T) {
// set up a few files in storage // set up a few files in storage
core, _, _ := TestCoreUnsealed(t) core, _, _ := TestCoreUnsealed(t)
a := core.activityLog a := core.activityLog
paths := [...]string{"entity/1111/1", "entity/992/3"} paths := [...]string{"entity/1111/1", "directtokens/1111/1", "directtokens/1000000/1", "entity/992/3", "directtokens/992/1"}
tokenPaths := [...]string{"directtokens/1111/1", "directtokens/1000000/1", "directtokens/992/1"}
expectedTimes := [...]time.Time{time.Unix(1000000, 0), time.Unix(1111, 0), time.Unix(992, 0)} expectedTimes := [...]time.Time{time.Unix(1000000, 0), time.Unix(1111, 0), time.Unix(992, 0)}
for _, path := range paths { for _, path := range paths {
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test")) WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
} }
for _, path := range tokenPaths {
WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test"))
}
// verify above files are there, and dates in correct order // verify above files are there, and dates in correct order
times, err := a.availableLogs(context.Background(), time.Now()) times, err := a.availableLogs(context.Background(), time.Now())
if err != nil { if err != nil {
@@ -783,7 +778,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp) path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp) path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp)
path2 := fmt.Sprintf("sys/counters/activity/log/entity/%d/2", startTimestamp) path2 := fmt.Sprintf("sys/counters/activity/log/entity/%d/2", startTimestamp)
tokenPath := fmt.Sprintf("sys/counters/activity/local/log/directtokens/%d/0", startTimestamp) tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", startTimestamp)
genID := func(i int) string { genID := func(i int) string {
return fmt.Sprintf("11111111-1111-1111-1111-%012d", i) return fmt.Sprintf("11111111-1111-1111-1111-%012d", i)
@@ -1145,7 +1140,7 @@ func TestActivityLog_tokenCountExists(t *testing.T) {
a := core.activityLog a := core.activityLog
paths := [...]string{"directtokens/992/0", "directtokens/1001/foo", "directtokens/1111/0", "directtokens/2222/1"} paths := [...]string{"directtokens/992/0", "directtokens/1001/foo", "directtokens/1111/0", "directtokens/2222/1"}
for _, path := range paths { for _, path := range paths {
WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test")) WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
} }
testCases := []struct { testCases := []struct {
@@ -1512,7 +1507,7 @@ func TestActivityLog_loadTokenCount(t *testing.T) {
ctx := context.Background() ctx := context.Background()
for _, tc := range testCases { for _, tc := range testCases {
WriteToStorage(t, core, ActivityLogLocalPrefix+tc.path, data) WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
} }
for _, tc := range testCases { for _, tc := range testCases {
@@ -1656,7 +1651,7 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
WriteToStorage(t, core, ActivityLogLocalPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData) WriteToStorage(t, core, ActivityLogPrefix+"directtokens/"+fmt.Sprint(base.Unix())+"/0", tokenData)
} }
return a, entityRecords, tokenRecords return a, entityRecords, tokenRecords
@@ -1983,17 +1978,11 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
"entity/1111/2", "entity/1111/2",
"entity/1111/3", "entity/1111/3",
"entity/1112/1", "entity/1112/1",
}
for _, path := range paths {
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
}
tokenPaths := []string{
"directtokens/1111/1", "directtokens/1111/1",
"directtokens/1112/1", "directtokens/1112/1",
} }
for _, path := range tokenPaths { for _, path := range paths {
WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test")) WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
} }
doneCh := make(chan struct{}) doneCh := make(chan struct{})
@@ -2009,13 +1998,13 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
// Check segments still present // Check segments still present
readSegmentFromStorage(t, core, ActivityLogPrefix+"entity/1112/1") readSegmentFromStorage(t, core, ActivityLogPrefix+"entity/1112/1")
readSegmentFromStorage(t, core, ActivityLogLocalPrefix+"directtokens/1112/1") readSegmentFromStorage(t, core, ActivityLogPrefix+"directtokens/1112/1")
// Check other segments not present // Check other segments not present
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/1") expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/1")
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/2") expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/2")
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/3") expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/3")
expectMissingSegment(t, core, ActivityLogLocalPrefix+"directtokens/1111/1") expectMissingSegment(t, core, ActivityLogPrefix+"directtokens/1111/1")
} }
// checkAPIWarnings ensures there is a warning if switching from enabled -> disabled, // checkAPIWarnings ensures there is a warning if switching from enabled -> disabled,
@@ -2134,7 +2123,7 @@ func TestActivityLog_EnableDisable(t *testing.T) {
path = fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, seg2) path = fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, seg2)
readSegmentFromStorage(t, core, path) readSegmentFromStorage(t, core, path)
path = fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogLocalPrefix, seg2) path = fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogPrefix, seg2)
} }
readSegmentFromStorage(t, core, path) readSegmentFromStorage(t, core, path)
} }
@@ -2382,7 +2371,7 @@ func TestActivityLog_CalculatePrecomputedQueriesWithMixedTWEs(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
tokenPath := fmt.Sprintf("%vdirecttokens/%v/%v", ActivityLogLocalPrefix, segment.StartTime, segment.Segment) tokenPath := fmt.Sprintf("%vdirecttokens/%v/%v", ActivityLogPrefix, segment.StartTime, segment.Segment)
WriteToStorage(t, core, tokenPath, data) WriteToStorage(t, core, tokenPath, data)
} }
@@ -3705,7 +3694,7 @@ func TestActivityLog_Deletion(t *testing.T) {
paths[i] = append(paths[i], entityPath) paths[i] = append(paths[i], entityPath)
WriteToStorage(t, core, entityPath, []byte("test")) WriteToStorage(t, core, entityPath, []byte("test"))
} }
tokenPath := fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogLocalPrefix, start.Unix()) tokenPath := fmt.Sprintf("%vdirecttokens/%v/0", ActivityLogPrefix, start.Unix())
paths[i] = append(paths[i], tokenPath) paths[i] = append(paths[i], tokenPath)
WriteToStorage(t, core, tokenPath, []byte("test")) WriteToStorage(t, core, tokenPath, []byte("test"))

View File

@@ -439,7 +439,7 @@ func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.T
if err != nil { if err != nil {
return nil, err return nil, err
} }
tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenLocalBasePath) tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenBasePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -1003,7 +1003,7 @@ func writeTokenSegment(t *testing.T, core *Core, ts time.Time, index int, item *
t.Helper() t.Helper()
protoItem, err := proto.Marshal(item) protoItem, err := proto.Marshal(item)
require.NoError(t, err) require.NoError(t, err)
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, ts, index), protoItem) WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, ts, index), protoItem)
} }
// makeSegmentPath formats the path for a segment at a particular time and index // makeSegmentPath formats the path for a segment at a particular time and index
@@ -1020,7 +1020,7 @@ func TestSegmentFileReader_BadData(t *testing.T) {
now := time.Now() now := time.Now()
// write bad data that won't be able to be unmarshaled at index 0 // write bad data that won't be able to be unmarshaled at index 0
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, 0), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, now, 0), []byte("fake data"))
WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data"))
// write entity at index 1 // write entity at index 1
@@ -1063,7 +1063,7 @@ func TestSegmentFileReader_MissingData(t *testing.T) {
now := time.Now() now := time.Now()
// write entities and tokens at indexes 0, 1, 2 // write entities and tokens at indexes 0, 1, 2
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, i), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityTokenBasePath, now, i), []byte("fake data"))
WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, i), []byte("fake data")) WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, i), []byte("fake data"))
} }
@@ -1084,7 +1084,7 @@ func TestSegmentFileReader_MissingData(t *testing.T) {
// delete the indexes 0, 1, 2 // delete the indexes 0, 1, 2
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenLocalBasePath, now, i))) require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenBasePath, now, i)))
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i))) require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i)))
} }