mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-11-02 19:47:54 +00:00
This reverts commit 504227bd74.
This commit is contained in:
@@ -53,7 +53,7 @@ func TestOperatorUsageCommandRun(t *testing.T) {
|
|||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(1).
|
NewPreviousMonthData(1).
|
||||||
NewClientsSeen(6, clientcountutil.WithClientType("entity")).
|
NewClientsSeen(6, clientcountutil.WithClientType("entity")).
|
||||||
NewClientsSeen(4, clientcountutil.WithClientType("non-entity-token")).
|
NewClientsSeen(4, clientcountutil.WithClientType("non-entity-token")).
|
||||||
|
|||||||
@@ -282,53 +282,42 @@ func (d *ActivityLogDataGenerator) ToProto() *generation.ActivityLogMockInput {
|
|||||||
// Write writes the data to the API with the given write options. The method
|
// Write writes the data to the API with the given write options. The method
|
||||||
// returns the new paths that have been written. Note that the API endpoint will
|
// returns the new paths that have been written. Note that the API endpoint will
|
||||||
// only be present when Vault has been compiled with the "testonly" flag.
|
// only be present when Vault has been compiled with the "testonly" flag.
|
||||||
func (d *ActivityLogDataGenerator) Write(ctx context.Context, writeOptions ...generation.WriteOptions) ([]string, []string, []string, error) {
|
func (d *ActivityLogDataGenerator) Write(ctx context.Context, writeOptions ...generation.WriteOptions) ([]string, []string, error) {
|
||||||
d.data.Write = writeOptions
|
d.data.Write = writeOptions
|
||||||
err := VerifyInput(d.data)
|
err := VerifyInput(d.data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
data, err := d.ToJSON()
|
data, err := d.ToJSON()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
resp, err := d.client.Logical().WriteWithContext(ctx, "sys/internal/counters/activity/write", map[string]interface{}{"input": string(data)})
|
resp, err := d.client.Logical().WriteWithContext(ctx, "sys/internal/counters/activity/write", map[string]interface{}{"input": string(data)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if resp.Data == nil {
|
if resp.Data == nil {
|
||||||
return nil, nil, nil, fmt.Errorf("received no data")
|
return nil, nil, fmt.Errorf("received no data")
|
||||||
}
|
}
|
||||||
paths := resp.Data["paths"]
|
paths := resp.Data["paths"]
|
||||||
castedPaths, ok := paths.([]interface{})
|
castedPaths, ok := paths.([]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, nil, fmt.Errorf("invalid paths data: %v", paths)
|
return nil, nil, fmt.Errorf("invalid paths data: %v", paths)
|
||||||
}
|
}
|
||||||
returnPaths := make([]string, 0, len(castedPaths))
|
returnPaths := make([]string, 0, len(castedPaths))
|
||||||
for _, path := range castedPaths {
|
for _, path := range castedPaths {
|
||||||
returnPaths = append(returnPaths, path.(string))
|
returnPaths = append(returnPaths, path.(string))
|
||||||
}
|
}
|
||||||
|
|
||||||
localPaths := resp.Data["local_paths"]
|
|
||||||
localCastedPaths, ok := localPaths.([]interface{})
|
|
||||||
if !ok {
|
|
||||||
return nil, nil, nil, fmt.Errorf("invalid local paths data: %v", localPaths)
|
|
||||||
}
|
|
||||||
returnLocalPaths := make([]string, 0, len(localCastedPaths))
|
|
||||||
for _, path := range localCastedPaths {
|
|
||||||
returnLocalPaths = append(returnLocalPaths, path.(string))
|
|
||||||
}
|
|
||||||
|
|
||||||
globalPaths := resp.Data["global_paths"]
|
globalPaths := resp.Data["global_paths"]
|
||||||
globalCastedPaths, ok := globalPaths.([]interface{})
|
globalCastedPaths, ok := globalPaths.([]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, nil, fmt.Errorf("invalid global paths data: %v", globalPaths)
|
return nil, nil, fmt.Errorf("invalid global paths data: %v", globalPaths)
|
||||||
}
|
}
|
||||||
returnGlobalPaths := make([]string, 0, len(globalCastedPaths))
|
returnGlobalPaths := make([]string, 0, len(globalCastedPaths))
|
||||||
for _, path := range globalCastedPaths {
|
for _, path := range globalCastedPaths {
|
||||||
returnGlobalPaths = append(returnGlobalPaths, path.(string))
|
returnGlobalPaths = append(returnGlobalPaths, path.(string))
|
||||||
}
|
}
|
||||||
return returnPaths, returnLocalPaths, returnGlobalPaths, nil
|
return returnPaths, returnGlobalPaths, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// VerifyInput checks that the input data is valid
|
// VerifyInput checks that the input data is valid
|
||||||
|
|||||||
@@ -116,7 +116,7 @@ func TestNewCurrentMonthData_AddClients(t *testing.T) {
|
|||||||
// sent to the server is correct.
|
// sent to the server is correct.
|
||||||
func TestWrite(t *testing.T) {
|
func TestWrite(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
_, err := io.WriteString(w, `{"data":{"paths":["path1","path2"],"global_paths":["path2","path3"], "local_paths":["path3","path4"]}}`)
|
_, err := io.WriteString(w, `{"data":{"paths":["path1","path2"],"global_paths":["path2","path3"]}}`)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -131,7 +131,7 @@ func TestWrite(t *testing.T) {
|
|||||||
Address: ts.URL,
|
Address: ts.URL,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
paths, localPaths, globalPaths, err := NewActivityLogData(client).
|
paths, globalPaths, err := NewActivityLogData(client).
|
||||||
NewPreviousMonthData(3).
|
NewPreviousMonthData(3).
|
||||||
NewClientSeen().
|
NewClientSeen().
|
||||||
NewPreviousMonthData(2).
|
NewPreviousMonthData(2).
|
||||||
@@ -142,7 +142,6 @@ func TestWrite(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, []string{"path1", "path2"}, paths)
|
require.Equal(t, []string{"path1", "path2"}, paths)
|
||||||
require.Equal(t, []string{"path2", "path3"}, globalPaths)
|
require.Equal(t, []string{"path2", "path3"}, globalPaths)
|
||||||
require.Equal(t, []string{"path3", "path4"}, localPaths)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func testAddClients(t *testing.T, makeGenerator func() *ActivityLogDataGenerator, getClient func(data *ActivityLogDataGenerator) *generation.Client) {
|
func testAddClients(t *testing.T, makeGenerator func() *ActivityLogDataGenerator, getClient func(data *ActivityLogDataGenerator) *generation.Client) {
|
||||||
|
|||||||
@@ -44,7 +44,6 @@ const (
|
|||||||
activityConfigKey = "config"
|
activityConfigKey = "config"
|
||||||
activityIntentLogKey = "endofmonth"
|
activityIntentLogKey = "endofmonth"
|
||||||
activityGlobalPathPrefix = "global/"
|
activityGlobalPathPrefix = "global/"
|
||||||
activityLocalPathPrefix = "local/"
|
|
||||||
|
|
||||||
activityACMERegenerationKey = "acme-regeneration"
|
activityACMERegenerationKey = "acme-regeneration"
|
||||||
// sketch for each month that stores hash of client ids
|
// sketch for each month that stores hash of client ids
|
||||||
@@ -144,7 +143,7 @@ type ActivityLog struct {
|
|||||||
|
|
||||||
// ActivityLog.l protects the configuration settings, except enable, and any modifications
|
// ActivityLog.l protects the configuration settings, except enable, and any modifications
|
||||||
// to the current segment.
|
// to the current segment.
|
||||||
// Acquire "l" before fragmentLock, globalFragmentLock, and localFragmentLock if all must be held.
|
// Acquire "l" before fragmentLock and globalFragmentLock if both must be held.
|
||||||
l sync.RWMutex
|
l sync.RWMutex
|
||||||
|
|
||||||
// fragmentLock protects enable, partialMonthClientTracker, fragment,
|
// fragmentLock protects enable, partialMonthClientTracker, fragment,
|
||||||
@@ -207,9 +206,6 @@ type ActivityLog struct {
|
|||||||
// track metadata and contents of the most recent global log segment
|
// track metadata and contents of the most recent global log segment
|
||||||
currentGlobalSegment segmentInfo
|
currentGlobalSegment segmentInfo
|
||||||
|
|
||||||
// track metadata and contents of the most recent local log segment
|
|
||||||
currentLocalSegment segmentInfo
|
|
||||||
|
|
||||||
// Fragments received from performance standbys
|
// Fragments received from performance standbys
|
||||||
standbyFragmentsReceived []*activity.LogFragment
|
standbyFragmentsReceived []*activity.LogFragment
|
||||||
|
|
||||||
@@ -401,19 +397,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
|
|||||||
},
|
},
|
||||||
clientSequenceNumber: 0,
|
clientSequenceNumber: 0,
|
||||||
},
|
},
|
||||||
currentLocalSegment: segmentInfo{
|
|
||||||
startTimestamp: 0,
|
|
||||||
currentClients: &activity.EntityActivityLog{
|
|
||||||
Clients: make([]*activity.EntityRecord, 0),
|
|
||||||
},
|
|
||||||
// tokenCount is deprecated, but must still exist for the current segment
|
|
||||||
// so the fragment that was using TWEs before the 1.9 changes
|
|
||||||
// can be flushed to the current segment.
|
|
||||||
tokenCount: &activity.TokenCount{
|
|
||||||
CountByNamespaceID: make(map[string]uint64),
|
|
||||||
},
|
|
||||||
clientSequenceNumber: 0,
|
|
||||||
},
|
|
||||||
standbyFragmentsReceived: make([]*activity.LogFragment, 0),
|
standbyFragmentsReceived: make([]*activity.LogFragment, 0),
|
||||||
standbyLocalFragmentsReceived: make([]*activity.LogFragment, 0),
|
standbyLocalFragmentsReceived: make([]*activity.LogFragment, 0),
|
||||||
standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0),
|
standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0),
|
||||||
@@ -526,8 +509,10 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
|
|||||||
a.localFragmentLock.Lock()
|
a.localFragmentLock.Lock()
|
||||||
localFragment := a.localFragment
|
localFragment := a.localFragment
|
||||||
a.localFragment = nil
|
a.localFragment = nil
|
||||||
standbyLocalFragments := a.standbyLocalFragmentsReceived
|
|
||||||
a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0)
|
// standbyLocalFragments := a.standbyLocalFragmentsReceived
|
||||||
|
// a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0)
|
||||||
|
|
||||||
a.localFragmentLock.Unlock()
|
a.localFragmentLock.Unlock()
|
||||||
|
|
||||||
// Measure the current local fragment
|
// Measure the current local fragment
|
||||||
@@ -544,11 +529,6 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// store local fragments
|
|
||||||
if ret := a.createCurrentSegmentFromFragments(ctx, append(standbyLocalFragments, localFragment), &a.currentLocalSegment, force, activityLocalPathPrefix); ret != nil {
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -736,7 +716,7 @@ func parseSegmentNumberFromPath(path string) (int, bool) {
|
|||||||
// sorted last to first
|
// sorted last to first
|
||||||
func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) {
|
func (a *ActivityLog) availableLogs(ctx context.Context, upTo time.Time) ([]time.Time, error) {
|
||||||
paths := make([]string, 0)
|
paths := make([]string, 0)
|
||||||
for _, basePath := range []string{activityEntityBasePath, activityLocalPathPrefix + activityEntityBasePath, activityGlobalPathPrefix + activityEntityBasePath, activityTokenLocalBasePath} {
|
for _, basePath := range []string{activityEntityBasePath, activityGlobalPathPrefix + activityEntityBasePath, activityTokenLocalBasePath} {
|
||||||
p, err := a.view.List(ctx, basePath)
|
p, err := a.view.List(ctx, basePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -785,27 +765,10 @@ func (a *ActivityLog) getMostRecentActivityLogSegment(ctx context.Context, now t
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists
|
// getLastEntitySegmentNumber returns the (non-negative) last segment number for the :startTime:, if it exists
|
||||||
func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, uint64, uint64, bool, error) {
|
func (a *ActivityLog) getLastEntitySegmentNumber(ctx context.Context, startTime time.Time) (uint64, uint64, bool, error) {
|
||||||
segmentHighestNum, segmentPresent, err := a.getLastSegmentNumberByEntityPath(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
|
p, err := a.view.List(ctx, activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, 0, false, err
|
return 0, 0, false, err
|
||||||
}
|
|
||||||
globalHighestNum, globalSegmentPresent, err := a.getLastSegmentNumberByEntityPath(ctx, activityGlobalPathPrefix+activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, 0, false, err
|
|
||||||
}
|
|
||||||
localHighestNum, localSegmentPresent, err := a.getLastSegmentNumberByEntityPath(ctx, activityLocalPathPrefix+activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, 0, false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return segmentHighestNum, uint64(localHighestNum), uint64(globalHighestNum), (segmentPresent || localSegmentPresent || globalSegmentPresent), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *ActivityLog) getLastSegmentNumberByEntityPath(ctx context.Context, entityPath string) (uint64, bool, error) {
|
|
||||||
p, err := a.view.List(ctx, entityPath)
|
|
||||||
if err != nil {
|
|
||||||
return 0, false, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
highestNum := -1
|
highestNum := -1
|
||||||
@@ -824,7 +787,27 @@ func (a *ActivityLog) getLastSegmentNumberByEntityPath(ctx context.Context, enti
|
|||||||
segmentHighestNum = 0
|
segmentHighestNum = 0
|
||||||
segmentPresent = false
|
segmentPresent = false
|
||||||
}
|
}
|
||||||
return segmentHighestNum, segmentPresent, nil
|
|
||||||
|
globalPaths, err := a.view.List(ctx, activityGlobalPathPrefix+activityEntityBasePath+fmt.Sprint(startTime.Unix())+"/")
|
||||||
|
if err != nil {
|
||||||
|
return segmentHighestNum, 0, segmentPresent, err
|
||||||
|
}
|
||||||
|
|
||||||
|
globalHighestNum := -1
|
||||||
|
for _, path := range globalPaths {
|
||||||
|
if num, ok := parseSegmentNumberFromPath(path); ok {
|
||||||
|
if num > globalHighestNum {
|
||||||
|
globalHighestNum = num
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if globalHighestNum < 0 {
|
||||||
|
// numbers less than 0 are invalid. if a negative number is the highest value, there isn't a segment
|
||||||
|
return segmentHighestNum, 0, segmentPresent, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return segmentHighestNum, uint64(globalHighestNum), segmentPresent, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WalkEntitySegments loads each of the entity segments for a particular start time
|
// WalkEntitySegments loads each of the entity segments for a particular start time
|
||||||
@@ -918,7 +901,6 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
|
|||||||
}
|
}
|
||||||
a.fragmentLock.Unlock()
|
a.fragmentLock.Unlock()
|
||||||
|
|
||||||
// load all the active global clients
|
|
||||||
globalPath := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
|
globalPath := activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
|
||||||
data, err = a.view.Get(ctx, globalPath)
|
data, err = a.view.Get(ctx, globalPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -942,37 +924,13 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
|
|||||||
}
|
}
|
||||||
a.globalFragmentLock.Unlock()
|
a.globalFragmentLock.Unlock()
|
||||||
|
|
||||||
// load all the active local clients
|
|
||||||
localPath := activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
|
|
||||||
data, err = a.view.Get(ctx, localPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if data == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
out = &activity.EntityActivityLog{}
|
|
||||||
err = proto.Unmarshal(data.Value, out)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
a.localFragmentLock.Lock()
|
|
||||||
// Handle the (unlikely) case where the end of the month has been reached while background loading.
|
|
||||||
// Or the feature has been disabled.
|
|
||||||
if a.enabled && startTime.Unix() == a.currentLocalSegment.startTimestamp {
|
|
||||||
for _, ent := range out.Clients {
|
|
||||||
a.partialMonthLocalClientTracker[ent.ClientID] = ent
|
|
||||||
}
|
|
||||||
}
|
|
||||||
a.localFragmentLock.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadCurrentClientSegment loads the most recent segment (for "this month")
|
// loadCurrentClientSegment loads the most recent segment (for "this month")
|
||||||
// into memory (to append new entries), and to the partialMonthClientTracker to
|
// into memory (to append new entries), and to the partialMonthClientTracker to
|
||||||
// avoid duplication call with fragmentLock, globalFragmentLock, localFragmentLock and l held.
|
// avoid duplication call with fragmentLock, globalFragmentLock and l held.
|
||||||
func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64, localSegmentSequenceNumber uint64, globalSegmentSequenceNumber uint64) error {
|
func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime time.Time, sequenceNum uint64, globalSegmentSequenceNumber uint64) error {
|
||||||
path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
|
path := activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(sequenceNum, 10)
|
||||||
data, err := a.view.Get(ctx, path)
|
data, err := a.view.Get(ctx, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1006,7 +964,6 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti
|
|||||||
a.partialMonthClientTracker[client.ClientID] = client
|
a.partialMonthClientTracker[client.ClientID] = client
|
||||||
}
|
}
|
||||||
|
|
||||||
// load current global segment
|
|
||||||
path = activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10)
|
path = activityGlobalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(globalSegmentSequenceNumber, 10)
|
||||||
data, err = a.view.Get(ctx, path)
|
data, err = a.view.Get(ctx, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1041,39 +998,6 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti
|
|||||||
a.globalPartialMonthClientTracker[client.ClientID] = client
|
a.globalPartialMonthClientTracker[client.ClientID] = client
|
||||||
}
|
}
|
||||||
|
|
||||||
// load current local segment
|
|
||||||
path = activityLocalPathPrefix + activityEntityBasePath + fmt.Sprint(startTime.Unix()) + "/" + strconv.FormatUint(localSegmentSequenceNumber, 10)
|
|
||||||
data, err = a.view.Get(ctx, path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if data == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
out = &activity.EntityActivityLog{}
|
|
||||||
err = proto.Unmarshal(data.Value, out)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !a.core.perfStandby {
|
|
||||||
a.currentLocalSegment = segmentInfo{
|
|
||||||
startTimestamp: startTime.Unix(),
|
|
||||||
currentClients: &activity.EntityActivityLog{
|
|
||||||
Clients: out.Clients,
|
|
||||||
},
|
|
||||||
tokenCount: a.currentLocalSegment.tokenCount,
|
|
||||||
clientSequenceNumber: sequenceNum,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// populate this for edge case checking (if end of month passes while background loading on standby)
|
|
||||||
a.currentLocalSegment.startTimestamp = startTime.Unix()
|
|
||||||
}
|
|
||||||
for _, client := range out.Clients {
|
|
||||||
a.partialMonthLocalClientTracker[client.ClientID] = client
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1129,7 +1053,6 @@ func (a *ActivityLog) loadTokenCount(ctx context.Context, startTime time.Time) e
|
|||||||
// so that TWEs counted before the introduction of a client ID for TWEs are
|
// so that TWEs counted before the introduction of a client ID for TWEs are
|
||||||
// still reported in the partial client counts.
|
// still reported in the partial client counts.
|
||||||
a.currentSegment.tokenCount = out
|
a.currentSegment.tokenCount = out
|
||||||
a.currentLocalSegment.tokenCount = out
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -1153,42 +1076,36 @@ func (a *ActivityLog) entityBackgroundLoader(ctx context.Context, wg *sync.WaitG
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize a new current segment, based on the current time.
|
// Initialize a new current segment, based on the current time.
|
||||||
// Call with fragmentLock, globalFragmentLock, localFragmentLock and l held.
|
// Call with fragmentLock, globalFragmentLock and l held.
|
||||||
func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) {
|
func (a *ActivityLog) startNewCurrentLogLocked(now time.Time) {
|
||||||
a.logger.Trace("initializing new log")
|
a.logger.Trace("initializing new log")
|
||||||
a.resetCurrentLog()
|
a.resetCurrentLog()
|
||||||
a.setCurrentSegmentTimeLocked(now)
|
a.currentSegment.startTimestamp = now.Unix()
|
||||||
|
a.currentGlobalSegment.startTimestamp = now.Unix()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should be called with fragmentLock, globalFragmentLock, localFragmentLock and l held.
|
// Should be called with fragmentLock, globalFragmentLock and l held.
|
||||||
func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) {
|
func (a *ActivityLog) newMonthCurrentLogLocked(currentTime time.Time) {
|
||||||
a.logger.Trace("continuing log to new month")
|
a.logger.Trace("continuing log to new month")
|
||||||
a.resetCurrentLog()
|
a.resetCurrentLog()
|
||||||
monthStart := timeutil.StartOfMonth(currentTime.UTC())
|
monthStart := timeutil.StartOfMonth(currentTime.UTC())
|
||||||
a.setCurrentSegmentTimeLocked(monthStart)
|
a.currentSegment.startTimestamp = monthStart.Unix()
|
||||||
|
a.currentGlobalSegment.startTimestamp = monthStart.Unix()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize a new current segment, based on the given time
|
// Initialize a new current segment, based on the given time
|
||||||
// should be called with fragmentLock, globalFragmentLock, localFragmentLock and l held.
|
// should be called with fragmentLock, globalFragmentLock and l held.
|
||||||
func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) {
|
func (a *ActivityLog) newSegmentAtGivenTime(t time.Time) {
|
||||||
timestamp := t.Unix()
|
timestamp := t.Unix()
|
||||||
|
|
||||||
a.logger.Trace("starting a segment", "timestamp", timestamp)
|
a.logger.Trace("starting a segment", "timestamp", timestamp)
|
||||||
a.resetCurrentLog()
|
a.resetCurrentLog()
|
||||||
a.setCurrentSegmentTimeLocked(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets the timestamp of all the segments to the given time.
|
|
||||||
// should be called with l held.
|
|
||||||
func (a *ActivityLog) setCurrentSegmentTimeLocked(t time.Time) {
|
|
||||||
timestamp := t.Unix()
|
|
||||||
a.currentSegment.startTimestamp = timestamp
|
a.currentSegment.startTimestamp = timestamp
|
||||||
a.currentGlobalSegment.startTimestamp = timestamp
|
a.currentGlobalSegment.startTimestamp = timestamp
|
||||||
a.currentLocalSegment.startTimestamp = timestamp
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset all the current segment state.
|
// Reset all the current segment state.
|
||||||
// Should be called with fragmentLock, globalFragmentLock, localFragmentLock and l held.
|
// Should be called with fragmentLock, globalFragmentLock and l held.
|
||||||
func (a *ActivityLog) resetCurrentLog() {
|
func (a *ActivityLog) resetCurrentLog() {
|
||||||
a.currentSegment.startTimestamp = 0
|
a.currentSegment.startTimestamp = 0
|
||||||
a.currentSegment.currentClients = &activity.EntityActivityLog{
|
a.currentSegment.currentClients = &activity.EntityActivityLog{
|
||||||
@@ -1202,56 +1119,62 @@ func (a *ActivityLog) resetCurrentLog() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
a.currentSegment.clientSequenceNumber = 0
|
a.currentSegment.clientSequenceNumber = 0
|
||||||
|
a.fragment = nil
|
||||||
|
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
||||||
|
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
|
||||||
|
|
||||||
// global segment
|
|
||||||
a.currentGlobalSegment.startTimestamp = 0
|
a.currentGlobalSegment.startTimestamp = 0
|
||||||
a.currentGlobalSegment.currentClients = &activity.EntityActivityLog{
|
a.currentGlobalSegment.currentClients = &activity.EntityActivityLog{
|
||||||
Clients: make([]*activity.EntityRecord, 0),
|
Clients: make([]*activity.EntityRecord, 0),
|
||||||
}
|
}
|
||||||
a.currentGlobalSegment.clientSequenceNumber = 0
|
a.currentGlobalSegment.clientSequenceNumber = 0
|
||||||
|
|
||||||
// local segment
|
|
||||||
a.currentLocalSegment.startTimestamp = 0
|
|
||||||
a.currentLocalSegment.currentClients = &activity.EntityActivityLog{
|
|
||||||
Clients: make([]*activity.EntityRecord, 0),
|
|
||||||
}
|
|
||||||
a.currentLocalSegment.clientSequenceNumber = 0
|
|
||||||
|
|
||||||
a.fragment = nil
|
|
||||||
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
|
||||||
|
|
||||||
a.currentGlobalFragment = nil
|
a.currentGlobalFragment = nil
|
||||||
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
||||||
|
|
||||||
a.localFragment = nil
|
|
||||||
a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord)
|
|
||||||
|
|
||||||
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
|
|
||||||
a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0)
|
|
||||||
a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0)
|
|
||||||
a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0)
|
a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0)
|
||||||
|
a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) {
|
func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) {
|
||||||
entityPathsToDelete := make([]string, 0)
|
entityPath := fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp)
|
||||||
entityPathsToDelete = append(entityPathsToDelete, fmt.Sprintf("%v%v/", activityEntityBasePath, startTimestamp))
|
tokenPath := fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp)
|
||||||
entityPathsToDelete = append(entityPathsToDelete, fmt.Sprintf("%s%v%v/", activityGlobalPathPrefix, activityEntityBasePath, startTimestamp))
|
globalEntityPath := fmt.Sprintf("%s%v%v/", activityGlobalPathPrefix, activityEntityBasePath, startTimestamp)
|
||||||
entityPathsToDelete = append(entityPathsToDelete, fmt.Sprintf("%s%v%v/", activityLocalPathPrefix, activityEntityBasePath, startTimestamp))
|
|
||||||
entityPathsToDelete = append(entityPathsToDelete, fmt.Sprintf("%v%v/", activityTokenLocalBasePath, startTimestamp))
|
|
||||||
|
|
||||||
for _, path := range entityPathsToDelete {
|
entitySegments, err := a.view.List(ctx, entityPath)
|
||||||
segments, err := a.view.List(ctx, path)
|
if err != nil {
|
||||||
|
a.logger.Error("could not list entity paths", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, p := range entitySegments {
|
||||||
|
err = a.view.Delete(ctx, entityPath+p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.logger.Error("could not list segment path", "error", err)
|
a.logger.Error("could not delete entity log", "error", err)
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, p := range segments {
|
|
||||||
err = a.view.Delete(ctx, path+p)
|
|
||||||
if err != nil {
|
|
||||||
a.logger.Error("could not delete log", "error", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tokenSegments, err := a.view.List(ctx, tokenPath)
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Error("could not list token paths", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, p := range tokenSegments {
|
||||||
|
err = a.view.Delete(ctx, tokenPath+p)
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Error("could not delete token log", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
globalEntitySegments, err := a.view.List(ctx, globalEntityPath)
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Error("could not list global entity paths", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, p := range globalEntitySegments {
|
||||||
|
err = a.view.Delete(ctx, globalEntityPath+p)
|
||||||
|
if err != nil {
|
||||||
|
a.logger.Error("could not delete global entity log", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Allow whoever started this as a goroutine to wait for it to finish.
|
// Allow whoever started this as a goroutine to wait for it to finish.
|
||||||
close(whenDone)
|
close(whenDone)
|
||||||
}
|
}
|
||||||
@@ -1279,9 +1202,6 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
|
|||||||
defer a.fragmentLock.Unlock()
|
defer a.fragmentLock.Unlock()
|
||||||
a.globalFragmentLock.Lock()
|
a.globalFragmentLock.Lock()
|
||||||
defer a.globalFragmentLock.Unlock()
|
defer a.globalFragmentLock.Unlock()
|
||||||
// startNewCurrentLogLocked below calls resetCurrentLog which is protected by fragmentLock, globalFragmentLock, localFragmentLock and l
|
|
||||||
a.localFragmentLock.Lock()
|
|
||||||
defer a.localFragmentLock.Unlock()
|
|
||||||
|
|
||||||
decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx, now)
|
decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx, now)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1350,7 +1270,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// load entity logs from storage into memory
|
// load entity logs from storage into memory
|
||||||
lastSegment, localLastSegment, globalLastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent)
|
lastSegment, globalLastSegment, segmentsExist, err := a.getLastEntitySegmentNumber(ctx, mostRecent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -1359,7 +1279,7 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment, localLastSegment, globalLastSegment)
|
err = a.loadCurrentClientSegment(ctx, mostRecent, lastSegment, globalLastSegment)
|
||||||
if err != nil || lastSegment == 0 {
|
if err != nil || lastSegment == 0 {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -1408,9 +1328,6 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
|
|||||||
|
|
||||||
// enabled is protected by fragmentLock
|
// enabled is protected by fragmentLock
|
||||||
a.fragmentLock.Lock()
|
a.fragmentLock.Lock()
|
||||||
// startNewCurrentLogLocked and resetCurrentLog is protected by fragmentLock, globalFragmentLock, localFragmentLock and l
|
|
||||||
a.localFragmentLock.Lock()
|
|
||||||
a.globalFragmentLock.Lock()
|
|
||||||
originalEnabled := a.enabled
|
originalEnabled := a.enabled
|
||||||
switch config.Enabled {
|
switch config.Enabled {
|
||||||
case "enable":
|
case "enable":
|
||||||
@@ -1425,7 +1342,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
|
|||||||
a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled)
|
a.logger.Info("activity log enable changed", "original", originalEnabled, "current", a.enabled)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !a.enabled && a.currentSegment.startTimestamp != 0 && a.currentGlobalSegment.startTimestamp != 0 && a.currentLocalSegment.startTimestamp != 0 {
|
if !a.enabled && a.currentSegment.startTimestamp != 0 && a.currentGlobalSegment.startTimestamp != 0 {
|
||||||
a.logger.Trace("deleting current segment")
|
a.logger.Trace("deleting current segment")
|
||||||
a.deleteDone = make(chan struct{})
|
a.deleteDone = make(chan struct{})
|
||||||
// this is called from a request under stateLock, so use activeContext
|
// this is called from a request under stateLock, so use activeContext
|
||||||
@@ -1434,7 +1351,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
forceSave := false
|
forceSave := false
|
||||||
if a.enabled && a.currentSegment.startTimestamp == 0 && a.currentGlobalSegment.startTimestamp == 0 && a.currentLocalSegment.startTimestamp == 0 {
|
if a.enabled && a.currentSegment.startTimestamp == 0 && a.currentGlobalSegment.startTimestamp == 0 {
|
||||||
a.startNewCurrentLogLocked(a.clock.Now().UTC())
|
a.startNewCurrentLogLocked(a.clock.Now().UTC())
|
||||||
// Force a save so we can distinguish between
|
// Force a save so we can distinguish between
|
||||||
//
|
//
|
||||||
@@ -1448,14 +1365,11 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {
|
|||||||
forceSave = true
|
forceSave = true
|
||||||
}
|
}
|
||||||
a.fragmentLock.Unlock()
|
a.fragmentLock.Unlock()
|
||||||
a.localFragmentLock.Unlock()
|
|
||||||
a.globalFragmentLock.Unlock()
|
|
||||||
|
|
||||||
if forceSave {
|
if forceSave {
|
||||||
// l is still held here
|
// l is still held here
|
||||||
a.saveCurrentSegmentInternal(ctx, true, a.currentSegment, "")
|
a.saveCurrentSegmentInternal(ctx, true, a.currentSegment, "")
|
||||||
a.saveCurrentSegmentInternal(ctx, true, a.currentGlobalSegment, activityGlobalPathPrefix)
|
a.saveCurrentSegmentInternal(ctx, true, a.currentGlobalSegment, activityGlobalPathPrefix)
|
||||||
a.saveCurrentSegmentInternal(ctx, true, a.currentLocalSegment, activityLocalPathPrefix)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
a.defaultReportMonths = config.DefaultReportMonths
|
a.defaultReportMonths = config.DefaultReportMonths
|
||||||
@@ -2010,12 +1924,7 @@ func (a *ActivityLog) HandleEndOfMonth(ctx context.Context, currentTime time.Tim
|
|||||||
// in the previous month, and recover by calling newMonthCurrentLog
|
// in the previous month, and recover by calling newMonthCurrentLog
|
||||||
// again and triggering the precomputed query.
|
// again and triggering the precomputed query.
|
||||||
a.fragmentLock.Lock()
|
a.fragmentLock.Lock()
|
||||||
// calls newMonthCurrentLogLocked which is protected by fragmentLock, globalFragmentLock, localFragmentLock and l
|
|
||||||
a.localFragmentLock.Lock()
|
|
||||||
a.globalFragmentLock.Lock()
|
|
||||||
a.newMonthCurrentLogLocked(currentTime)
|
a.newMonthCurrentLogLocked(currentTime)
|
||||||
a.globalFragmentLock.Unlock()
|
|
||||||
a.localFragmentLock.Unlock()
|
|
||||||
a.fragmentLock.Unlock()
|
a.fragmentLock.Unlock()
|
||||||
|
|
||||||
// Work on precomputed queries in background
|
// Work on precomputed queries in background
|
||||||
@@ -3301,9 +3210,6 @@ func (a *ActivityLog) populateNamespaceAndMonthlyBreakdowns() (map[int64]*proces
|
|||||||
for _, e := range a.partialMonthClientTracker {
|
for _, e := range a.partialMonthClientTracker {
|
||||||
processClientRecord(e, byNamespace, byMonth, a.clock.Now())
|
processClientRecord(e, byNamespace, byMonth, a.clock.Now())
|
||||||
}
|
}
|
||||||
for _, e := range a.partialMonthLocalClientTracker {
|
|
||||||
processClientRecord(e, byNamespace, byMonth, a.clock.Now())
|
|
||||||
}
|
|
||||||
return byMonth, byNamespace
|
return byMonth, byNamespace
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -626,91 +626,6 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
|
|||||||
expectedEntityIDs(t, out, ids)
|
expectedEntityIDs(t, out, ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestActivityLog_SaveEntitiesToStorageCommon calls AddClientToFragment with clients with local and non-local mount accessors and then
|
|
||||||
// writes the segment to storage. Read back from storage, and verify that client IDs exist in storage in the right local and non-local entity paths.
|
|
||||||
func TestActivityLog_SaveEntitiesToStorageCommon(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
storage := &logical.InmemStorage{}
|
|
||||||
coreConfig := &CoreConfig{
|
|
||||||
CredentialBackends: map[string]logical.Factory{
|
|
||||||
"userpass": userpass.Factory,
|
|
||||||
},
|
|
||||||
Physical: storage.Underlying(),
|
|
||||||
}
|
|
||||||
|
|
||||||
cluster := NewTestCluster(t, coreConfig, nil)
|
|
||||||
core := cluster.Cores[0].Core
|
|
||||||
TestWaitActive(t, core)
|
|
||||||
|
|
||||||
ctx := namespace.RootContext(nil)
|
|
||||||
|
|
||||||
a := core.activityLog
|
|
||||||
a.SetEnable(true)
|
|
||||||
a.SetStartTimestamp(time.Now().Unix()) // set a nonzero segment
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// create a local and non-local mount entry
|
|
||||||
nonLocalMountEntry := &MountEntry{
|
|
||||||
Table: credentialTableType,
|
|
||||||
Path: "nonLocalUserpass/",
|
|
||||||
Type: "userpass",
|
|
||||||
Accessor: "nonLocalMountAccessor",
|
|
||||||
}
|
|
||||||
err = core.enableCredential(ctx, nonLocalMountEntry)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
localMountEntry := &MountEntry{
|
|
||||||
Table: credentialTableType,
|
|
||||||
Path: "localUserpass/",
|
|
||||||
Local: true,
|
|
||||||
Type: "userpass",
|
|
||||||
Accessor: "localMountAccessor",
|
|
||||||
}
|
|
||||||
err = core.enableCredential(ctx, localMountEntry)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
ids := []string{"non-local-client-id-1", "non-local-client-id-2", "local-client-id-1"}
|
|
||||||
|
|
||||||
globalPath := fmt.Sprintf("%sentity/%d/0", ActivityGlobalLogPrefix, a.GetStartTimestamp())
|
|
||||||
localPath := fmt.Sprintf("%sentity/%d/0", ActivityLogLocalPrefix, a.GetStartTimestamp())
|
|
||||||
|
|
||||||
// add clients with local and non-local mount accessors
|
|
||||||
a.AddClientToFragment(ids[0], "root", now.Unix(), false, "nonLocalMountAccessor")
|
|
||||||
a.AddClientToFragment(ids[1], "root", now.Unix(), false, "nonLocalMountAccessor")
|
|
||||||
a.AddClientToFragment(ids[2], "root", now.Unix(), false, "localMountAccessor")
|
|
||||||
|
|
||||||
err = a.saveCurrentSegmentToStorage(ctx, false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("got error writing entities to storage: %v", err)
|
|
||||||
}
|
|
||||||
if a.fragment != nil {
|
|
||||||
t.Errorf("fragment was not reset after write to storage")
|
|
||||||
}
|
|
||||||
|
|
||||||
// read entity ids from non-local entity storage path
|
|
||||||
protoSegment := readSegmentFromStorage(t, core, globalPath)
|
|
||||||
out := &activity.EntityActivityLog{}
|
|
||||||
err = proto.Unmarshal(protoSegment.Value, out)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("could not unmarshal protobuf: %v", err)
|
|
||||||
}
|
|
||||||
expectedEntityIDs(t, out, ids[:2])
|
|
||||||
|
|
||||||
// read entity ids from local entity storage path
|
|
||||||
protoSegment = readSegmentFromStorage(t, core, localPath)
|
|
||||||
out = &activity.EntityActivityLog{}
|
|
||||||
err = proto.Unmarshal(protoSegment.Value, out)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("could not unmarshal protobuf: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// local entity is local-client-id-1 in ids with index 2
|
|
||||||
expectedEntityIDs(t, out, ids[2:])
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestActivityLog_StoreAndReadHyperloglog inserts into a hyperloglog, stores it and then reads it back. The test
|
// TestActivityLog_StoreAndReadHyperloglog inserts into a hyperloglog, stores it and then reads it back. The test
|
||||||
// verifies the estimate count is correct.
|
// verifies the estimate count is correct.
|
||||||
func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) {
|
func TestActivityLog_StoreAndReadHyperloglog(t *testing.T) {
|
||||||
@@ -1307,64 +1222,54 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) {
|
|||||||
a := core.activityLog
|
a := core.activityLog
|
||||||
paths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/0", "entity/1111/1"}
|
paths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/0", "entity/1111/1"}
|
||||||
globalPaths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/1"}
|
globalPaths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/1"}
|
||||||
localPaths := [...]string{"entity/992/0", "entity/1000/-1", "entity/1001/foo", "entity/1111/0", "entity/1111/1"}
|
|
||||||
for _, path := range paths {
|
for _, path := range paths {
|
||||||
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
|
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
|
||||||
}
|
}
|
||||||
for _, path := range globalPaths {
|
for _, path := range globalPaths {
|
||||||
WriteToStorage(t, core, ActivityGlobalLogPrefix+path, []byte("test"))
|
WriteToStorage(t, core, ActivityGlobalLogPrefix+path, []byte("test"))
|
||||||
}
|
}
|
||||||
for _, path := range localPaths {
|
|
||||||
WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test"))
|
|
||||||
}
|
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
input int64
|
input int64
|
||||||
expectedVal uint64
|
expectedVal uint64
|
||||||
expectedGlobalVal uint64
|
expectedGlobalVal uint64
|
||||||
expectedLocalVal uint64
|
|
||||||
expectExists bool
|
expectExists bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
input: 992,
|
input: 992,
|
||||||
expectedVal: 0,
|
expectedVal: 0,
|
||||||
expectedGlobalVal: 0,
|
expectedGlobalVal: 0,
|
||||||
expectedLocalVal: 0,
|
|
||||||
expectExists: true,
|
expectExists: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
input: 1000,
|
input: 1000,
|
||||||
expectedVal: 0,
|
expectedVal: 0,
|
||||||
expectedGlobalVal: 0,
|
expectedGlobalVal: 0,
|
||||||
expectedLocalVal: 0,
|
|
||||||
expectExists: false,
|
expectExists: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
input: 1001,
|
input: 1001,
|
||||||
expectedVal: 0,
|
expectedVal: 0,
|
||||||
expectedGlobalVal: 0,
|
expectedGlobalVal: 0,
|
||||||
expectedLocalVal: 0,
|
|
||||||
expectExists: false,
|
expectExists: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
input: 1111,
|
input: 1111,
|
||||||
expectedVal: 1,
|
expectedVal: 1,
|
||||||
expectedGlobalVal: 1,
|
expectedGlobalVal: 1,
|
||||||
expectedLocalVal: 1,
|
|
||||||
expectExists: true,
|
expectExists: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
input: 2222,
|
input: 2222,
|
||||||
expectedVal: 0,
|
expectedVal: 0,
|
||||||
expectedGlobalVal: 0,
|
expectedGlobalVal: 0,
|
||||||
expectedLocalVal: 0,
|
|
||||||
expectExists: false,
|
expectExists: false,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
result, localSegmentNumber, globalSegmentNumber, exists, err := a.getLastEntitySegmentNumber(ctx, time.Unix(tc.input, 0))
|
result, globalSegmentNumber, exists, err := a.getLastEntitySegmentNumber(ctx, time.Unix(tc.input, 0))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error for input %d: %v", tc.input, err)
|
t.Fatalf("unexpected error for input %d: %v", tc.input, err)
|
||||||
}
|
}
|
||||||
@@ -1377,9 +1282,7 @@ func TestActivityLog_getLastEntitySegmentNumber(t *testing.T) {
|
|||||||
if globalSegmentNumber != tc.expectedGlobalVal {
|
if globalSegmentNumber != tc.expectedGlobalVal {
|
||||||
t.Errorf("expected: %d got: %d for input: %d", tc.expectedGlobalVal, globalSegmentNumber, tc.input)
|
t.Errorf("expected: %d got: %d for input: %d", tc.expectedGlobalVal, globalSegmentNumber, tc.input)
|
||||||
}
|
}
|
||||||
if localSegmentNumber != tc.expectedLocalVal {
|
|
||||||
t.Errorf("expected: %d got: %d for input: %d", tc.expectedLocalVal, localSegmentNumber, tc.input)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1514,24 +1417,6 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
|
|||||||
clientSequenceNumber: 0,
|
clientSequenceNumber: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
a.currentGlobalSegment = segmentInfo{
|
|
||||||
startTimestamp: time.Time{}.Unix(),
|
|
||||||
currentClients: &activity.EntityActivityLog{
|
|
||||||
Clients: make([]*activity.EntityRecord, 0),
|
|
||||||
},
|
|
||||||
tokenCount: a.currentGlobalSegment.tokenCount,
|
|
||||||
clientSequenceNumber: 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
a.currentLocalSegment = segmentInfo{
|
|
||||||
startTimestamp: time.Time{}.Unix(),
|
|
||||||
currentClients: &activity.EntityActivityLog{
|
|
||||||
Clients: make([]*activity.EntityRecord, 0),
|
|
||||||
},
|
|
||||||
tokenCount: a.currentLocalSegment.tokenCount,
|
|
||||||
clientSequenceNumber: 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
||||||
a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord)
|
a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord)
|
||||||
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
||||||
@@ -1550,7 +1435,6 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
a.l.Lock()
|
a.l.Lock()
|
||||||
a.currentSegment.tokenCount = tokenCount
|
a.currentSegment.tokenCount = tokenCount
|
||||||
a.currentLocalSegment.tokenCount = tokenCount
|
|
||||||
a.l.Unlock()
|
a.l.Unlock()
|
||||||
|
|
||||||
// setup in-storage data to load for testing
|
// setup in-storage data to load for testing
|
||||||
@@ -1612,7 +1496,6 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
|
WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
|
||||||
WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data)
|
WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data)
|
||||||
WriteToStorage(t, core, ActivityLogLocalPrefix+tc.path, data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
@@ -1620,12 +1503,10 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) {
|
|||||||
a.l.Lock()
|
a.l.Lock()
|
||||||
a.fragmentLock.Lock()
|
a.fragmentLock.Lock()
|
||||||
a.globalFragmentLock.Lock()
|
a.globalFragmentLock.Lock()
|
||||||
a.localFragmentLock.Lock()
|
|
||||||
// loadCurrentClientSegment requires us to grab the fragment lock and the
|
// loadCurrentClientSegment requires us to grab the fragment lock and the
|
||||||
// activityLog lock, as per the comment in the loadCurrentClientSegment
|
// activityLog lock, as per the comment in the loadCurrentClientSegment
|
||||||
// function
|
// function
|
||||||
err := a.loadCurrentClientSegment(ctx, time.Unix(tc.time, 0), tc.seqNum, tc.seqNum, tc.seqNum)
|
err := a.loadCurrentClientSegment(ctx, time.Unix(tc.time, 0), tc.seqNum, tc.seqNum)
|
||||||
a.localFragmentLock.Unlock()
|
|
||||||
a.globalFragmentLock.Unlock()
|
a.globalFragmentLock.Unlock()
|
||||||
a.fragmentLock.Unlock()
|
a.fragmentLock.Unlock()
|
||||||
a.l.Unlock()
|
a.l.Unlock()
|
||||||
@@ -1638,26 +1519,21 @@ func TestActivityLog_loadCurrentClientSegment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// verify accurate data in in-memory current segment
|
// verify accurate data in in-memory current segment
|
||||||
require.Equal(t, tc.time, a.GetStartTimestamp())
|
startTimestamp := a.GetStartTimestamp()
|
||||||
require.Equal(t, tc.seqNum, a.GetEntitySequenceNumber())
|
if startTimestamp != tc.time {
|
||||||
require.Equal(t, tc.seqNum, a.GetGlobalEntitySequenceNumber())
|
t.Errorf("bad timestamp loaded. expected: %v, got: %v for path %q", tc.time, startTimestamp, tc.path)
|
||||||
require.Equal(t, tc.seqNum, a.GetLocalEntitySequenceNumber())
|
}
|
||||||
|
|
||||||
|
seqNum := a.GetEntitySequenceNumber()
|
||||||
|
if seqNum != tc.seqNum {
|
||||||
|
t.Errorf("bad sequence number loaded. expected: %v, got: %v for path %q", tc.seqNum, seqNum, tc.path)
|
||||||
|
}
|
||||||
|
|
||||||
currentEntities := a.GetCurrentEntities()
|
currentEntities := a.GetCurrentEntities()
|
||||||
if !entityRecordsEqual(t, currentEntities.Clients, tc.entities.Clients) {
|
if !entityRecordsEqual(t, currentEntities.Clients, tc.entities.Clients) {
|
||||||
t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentEntities, tc.path)
|
t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentEntities, tc.path)
|
||||||
}
|
}
|
||||||
|
|
||||||
globalClients := core.GetActiveGlobalClientsList()
|
|
||||||
if err := ActiveEntitiesEqual(globalClients, tc.entities.Clients); err != nil {
|
|
||||||
t.Errorf("bad data loaded into active global entities. expected only set of EntityID from %v in %v for path %q: %v", tc.entities.Clients, globalClients, tc.path, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
localClients := core.GetActiveLocalClientsList()
|
|
||||||
if err := ActiveEntitiesEqual(localClients, tc.entities.Clients); err != nil {
|
|
||||||
t.Errorf("bad data loaded into active local entities. expected only set of EntityID from %v in %v for path %q: %v", tc.entities.Clients, localClients, tc.path, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentGlobalEntities := a.GetCurrentGlobalEntities()
|
currentGlobalEntities := a.GetCurrentGlobalEntities()
|
||||||
if !entityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) {
|
if !entityRecordsEqual(t, currentGlobalEntities.Clients, tc.entities.Clients) {
|
||||||
t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentGlobalEntities, tc.path)
|
t.Errorf("bad data loaded. expected: %v, got: %v for path %q", tc.entities.Clients, currentGlobalEntities, tc.path)
|
||||||
@@ -1741,7 +1617,6 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
|
WriteToStorage(t, core, ActivityLogPrefix+tc.path, data)
|
||||||
WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data)
|
WriteToStorage(t, core, ActivityGlobalLogPrefix+tc.path, data)
|
||||||
WriteToStorage(t, core, ActivityLogLocalPrefix+tc.path, data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
@@ -1755,7 +1630,6 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
|
|||||||
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
|
||||||
a.currentSegment.startTimestamp = tc.time
|
a.currentSegment.startTimestamp = tc.time
|
||||||
a.currentGlobalSegment.startTimestamp = tc.time
|
a.currentGlobalSegment.startTimestamp = tc.time
|
||||||
a.currentLocalSegment.startTimestamp = tc.time
|
|
||||||
a.fragmentLock.Unlock()
|
a.fragmentLock.Unlock()
|
||||||
a.localFragmentLock.Unlock()
|
a.localFragmentLock.Unlock()
|
||||||
a.l.Unlock()
|
a.l.Unlock()
|
||||||
@@ -1920,14 +1794,6 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
|
|||||||
},
|
},
|
||||||
}...)
|
}...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// append some local entity data
|
|
||||||
entityRecords = append(entityRecords, &activity.EntityRecord{
|
|
||||||
ClientID: "44444444-4444-4444-4444-444444444444",
|
|
||||||
NamespaceID: namespace.RootNamespaceID,
|
|
||||||
Timestamp: time.Now().Unix(),
|
|
||||||
})
|
|
||||||
|
|
||||||
for i, entityRecord := range entityRecords {
|
for i, entityRecord := range entityRecords {
|
||||||
entityData, err := proto.Marshal(&activity.EntityActivityLog{
|
entityData, err := proto.Marshal(&activity.EntityActivityLog{
|
||||||
Clients: []*activity.EntityRecord{entityRecord},
|
Clients: []*activity.EntityRecord{entityRecord},
|
||||||
@@ -1935,15 +1801,10 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf(err.Error())
|
t.Fatalf(err.Error())
|
||||||
}
|
}
|
||||||
switch i {
|
if i == 0 {
|
||||||
case 0:
|
|
||||||
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData)
|
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData)
|
||||||
WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData)
|
WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData)
|
||||||
|
} else {
|
||||||
case len(entityRecords) - 1:
|
|
||||||
// local data
|
|
||||||
WriteToStorage(t, core, ActivityLogLocalPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData)
|
|
||||||
default:
|
|
||||||
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData)
|
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData)
|
||||||
WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData)
|
WriteToStorage(t, core, ActivityGlobalLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData)
|
||||||
}
|
}
|
||||||
@@ -1992,9 +1853,6 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
|
|||||||
Clients: expectedClientRecords[1:],
|
Clients: expectedClientRecords[1:],
|
||||||
}
|
}
|
||||||
expectedCurrent := &activity.EntityActivityLog{
|
expectedCurrent := &activity.EntityActivityLog{
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1],
|
|
||||||
}
|
|
||||||
expectedCurrentLocal := &activity.EntityActivityLog{
|
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-1:],
|
Clients: expectedClientRecords[len(expectedClientRecords)-1:],
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2004,12 +1862,6 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
|
|||||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
|
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
|
||||||
}
|
}
|
||||||
|
|
||||||
currentLocalEntities := a.GetCurrentLocalEntities()
|
|
||||||
if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
|
|
||||||
// we only expect the newest local entity segment to be loaded (for the current month)
|
|
||||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities)
|
|
||||||
}
|
|
||||||
|
|
||||||
nsCount := a.GetStoredTokenCountByNamespaceID()
|
nsCount := a.GetStoredTokenCountByNamespaceID()
|
||||||
if !reflect.DeepEqual(nsCount, expectedTokenCounts) {
|
if !reflect.DeepEqual(nsCount, expectedTokenCounts) {
|
||||||
// we expect all token counts to be loaded
|
// we expect all token counts to be loaded
|
||||||
@@ -2044,31 +1896,14 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi
|
|||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// refreshFromStoredLog loads the most recent segment and then loads the older segments in the background
|
|
||||||
// most recent global and local entity from setupActivityRecordsInStorage
|
|
||||||
expected := &activity.EntityActivityLog{
|
expected := &activity.EntityActivityLog{
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-2:],
|
|
||||||
}
|
|
||||||
|
|
||||||
// most recent global entity from setupActivityRecordsInStorage
|
|
||||||
expectedCurrent := &activity.EntityActivityLog{
|
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1],
|
|
||||||
}
|
|
||||||
// most recent local entity from setupActivityRecordsInStorage
|
|
||||||
expectedCurrentLocal := &activity.EntityActivityLog{
|
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-1:],
|
Clients: expectedClientRecords[len(expectedClientRecords)-1:],
|
||||||
}
|
}
|
||||||
|
|
||||||
currentEntities := a.GetCurrentGlobalEntities()
|
currentEntities := a.GetCurrentEntities()
|
||||||
if !entityRecordsEqual(t, currentEntities.Clients, expectedCurrent.Clients) {
|
if !entityRecordsEqual(t, currentEntities.Clients, expected.Clients) {
|
||||||
// we only expect the newest entity segment to be loaded (for the current month)
|
// we only expect the newest entity segment to be loaded (for the current month)
|
||||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
|
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expected, currentEntities)
|
||||||
}
|
|
||||||
|
|
||||||
currentLocalEntities := a.GetCurrentLocalEntities()
|
|
||||||
if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
|
|
||||||
// we only expect the newest local entity segment to be loaded (for the current month)
|
|
||||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nsCount := a.GetStoredTokenCountByNamespaceID()
|
nsCount := a.GetStoredTokenCountByNamespaceID()
|
||||||
@@ -2116,12 +1951,6 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
|
|||||||
Clients: expectedClientRecords[1:],
|
Clients: expectedClientRecords[1:],
|
||||||
}
|
}
|
||||||
expectedCurrent := &activity.EntityActivityLog{
|
expectedCurrent := &activity.EntityActivityLog{
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1],
|
|
||||||
}
|
|
||||||
expectedCurrentGlobal := &activity.EntityActivityLog{
|
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1],
|
|
||||||
}
|
|
||||||
expectedCurrentLocal := &activity.EntityActivityLog{
|
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-1:],
|
Clients: expectedClientRecords[len(expectedClientRecords)-1:],
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2130,19 +1959,6 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
|
|||||||
// we expect all segments for the current month to be loaded
|
// we expect all segments for the current month to be loaded
|
||||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
|
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrent, currentEntities)
|
||||||
}
|
}
|
||||||
|
|
||||||
currentGlobalEntities := a.GetCurrentGlobalEntities()
|
|
||||||
if !entityRecordsEqual(t, currentGlobalEntities.Clients, expectedCurrentGlobal.Clients) {
|
|
||||||
// we only expect the newest entity segment to be loaded (for the current month)
|
|
||||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentGlobal, currentGlobalEntities)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentLocalEntities := a.GetCurrentLocalEntities()
|
|
||||||
if !entityRecordsEqual(t, currentLocalEntities.Clients, expectedCurrentLocal.Clients) {
|
|
||||||
// we only expect the newest local entity segment to be loaded (for the current month)
|
|
||||||
t.Errorf("bad activity entity logs loaded. expected: %v got: %v", expectedCurrentLocal, currentLocalEntities)
|
|
||||||
}
|
|
||||||
|
|
||||||
activeClients := a.core.GetActiveClientsList()
|
activeClients := a.core.GetActiveClientsList()
|
||||||
if err := ActiveEntitiesEqual(activeClients, expectedActive.Clients); err != nil {
|
if err := ActiveEntitiesEqual(activeClients, expectedActive.Clients); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@@ -2242,7 +2058,7 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
|
|||||||
Clients: expectedClientRecords[1:],
|
Clients: expectedClientRecords[1:],
|
||||||
}
|
}
|
||||||
expectedCurrent := &activity.EntityActivityLog{
|
expectedCurrent := &activity.EntityActivityLog{
|
||||||
Clients: expectedClientRecords[len(expectedClientRecords)-2 : len(expectedClientRecords)-1],
|
Clients: expectedClientRecords[len(expectedClientRecords)-1:],
|
||||||
}
|
}
|
||||||
|
|
||||||
currentEntities := a.GetCurrentEntities()
|
currentEntities := a.GetCurrentEntities()
|
||||||
@@ -2346,16 +2162,6 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
|
|||||||
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
|
WriteToStorage(t, core, ActivityLogPrefix+path, []byte("test"))
|
||||||
}
|
}
|
||||||
|
|
||||||
localPaths := []string{
|
|
||||||
"entity/1111/1",
|
|
||||||
"entity/1111/2",
|
|
||||||
"entity/1111/3",
|
|
||||||
"entity/1112/1",
|
|
||||||
}
|
|
||||||
for _, path := range localPaths {
|
|
||||||
WriteToStorage(t, core, ActivityLogLocalPrefix+path, []byte("test"))
|
|
||||||
}
|
|
||||||
|
|
||||||
tokenPaths := []string{
|
tokenPaths := []string{
|
||||||
"directtokens/1111/1",
|
"directtokens/1111/1",
|
||||||
"directtokens/1112/1",
|
"directtokens/1112/1",
|
||||||
@@ -2377,16 +2183,12 @@ func TestActivityLog_DeleteWorker(t *testing.T) {
|
|||||||
|
|
||||||
// Check segments still present
|
// Check segments still present
|
||||||
readSegmentFromStorage(t, core, ActivityLogPrefix+"entity/1112/1")
|
readSegmentFromStorage(t, core, ActivityLogPrefix+"entity/1112/1")
|
||||||
readSegmentFromStorage(t, core, ActivityLogLocalPrefix+"entity/1112/1")
|
|
||||||
readSegmentFromStorage(t, core, ActivityLogLocalPrefix+"directtokens/1112/1")
|
readSegmentFromStorage(t, core, ActivityLogLocalPrefix+"directtokens/1112/1")
|
||||||
|
|
||||||
// Check other segments not present
|
// Check other segments not present
|
||||||
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/1")
|
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/1")
|
||||||
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/2")
|
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/2")
|
||||||
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/3")
|
expectMissingSegment(t, core, ActivityLogPrefix+"entity/1111/3")
|
||||||
expectMissingSegment(t, core, ActivityLogLocalPrefix+"entity/1111/1")
|
|
||||||
expectMissingSegment(t, core, ActivityLogLocalPrefix+"entity/1111/2")
|
|
||||||
expectMissingSegment(t, core, ActivityLogLocalPrefix+"entity/1111/3")
|
|
||||||
expectMissingSegment(t, core, ActivityLogLocalPrefix+"directtokens/1111/1")
|
expectMissingSegment(t, core, ActivityLogLocalPrefix+"directtokens/1111/1")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2515,23 +2317,9 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||||||
// We only want *fake* end of months, *real* ones are too scary.
|
// We only want *fake* end of months, *real* ones are too scary.
|
||||||
timeutil.SkipAtEndOfMonth(t)
|
timeutil.SkipAtEndOfMonth(t)
|
||||||
|
|
||||||
t.Parallel()
|
core, _, _ := TestCoreUnsealed(t)
|
||||||
|
|
||||||
storage := &logical.InmemStorage{}
|
|
||||||
coreConfig := &CoreConfig{
|
|
||||||
CredentialBackends: map[string]logical.Factory{
|
|
||||||
"userpass": userpass.Factory,
|
|
||||||
},
|
|
||||||
Physical: storage.Underlying(),
|
|
||||||
}
|
|
||||||
|
|
||||||
cluster := NewTestCluster(t, coreConfig, nil)
|
|
||||||
core := cluster.Cores[0].Core
|
|
||||||
TestWaitActive(t, core)
|
|
||||||
|
|
||||||
ctx := namespace.RootContext(nil)
|
|
||||||
|
|
||||||
a := core.activityLog
|
a := core.activityLog
|
||||||
|
ctx := namespace.RootContext(nil)
|
||||||
|
|
||||||
// Make sure we're enabled.
|
// Make sure we're enabled.
|
||||||
a.SetConfig(ctx, activityConfig{
|
a.SetConfig(ctx, activityConfig{
|
||||||
@@ -2543,21 +2331,8 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||||||
id1 := "11111111-1111-1111-1111-111111111111"
|
id1 := "11111111-1111-1111-1111-111111111111"
|
||||||
id2 := "22222222-2222-2222-2222-222222222222"
|
id2 := "22222222-2222-2222-2222-222222222222"
|
||||||
id3 := "33333333-3333-3333-3333-333333333333"
|
id3 := "33333333-3333-3333-3333-333333333333"
|
||||||
id4 := "44444444-4444-4444-4444-444444444444"
|
|
||||||
a.AddEntityToFragment(id1, "root", time.Now().Unix())
|
a.AddEntityToFragment(id1, "root", time.Now().Unix())
|
||||||
|
|
||||||
// add local data
|
|
||||||
localMountEntry := &MountEntry{
|
|
||||||
Table: credentialTableType,
|
|
||||||
Path: "localUserpass/",
|
|
||||||
Local: true,
|
|
||||||
Type: "userpass",
|
|
||||||
Accessor: "localMountAccessor",
|
|
||||||
}
|
|
||||||
err := core.enableCredential(ctx, localMountEntry)
|
|
||||||
require.NoError(t, err)
|
|
||||||
a.AddClientToFragment(id4, "root", time.Now().Unix(), false, "localMountAccessor")
|
|
||||||
|
|
||||||
month0 := time.Now().UTC()
|
month0 := time.Now().UTC()
|
||||||
segment0 := a.GetStartTimestamp()
|
segment0 := a.GetStartTimestamp()
|
||||||
month1 := timeutil.StartOfNextMonth(month0)
|
month1 := timeutil.StartOfNextMonth(month0)
|
||||||
@@ -2570,12 +2345,10 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||||||
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, segment0)
|
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, segment0)
|
||||||
protoSegment := readSegmentFromStorage(t, core, path)
|
protoSegment := readSegmentFromStorage(t, core, path)
|
||||||
out := &activity.EntityActivityLog{}
|
out := &activity.EntityActivityLog{}
|
||||||
err = proto.Unmarshal(protoSegment.Value, out)
|
err := proto.Unmarshal(protoSegment.Value, out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
expectedEntityIDs(t, out, []string{id1, id4})
|
|
||||||
|
|
||||||
path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, segment0)
|
path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, segment0)
|
||||||
protoSegment = readSegmentFromStorage(t, core, path)
|
protoSegment = readSegmentFromStorage(t, core, path)
|
||||||
out = &activity.EntityActivityLog{}
|
out = &activity.EntityActivityLog{}
|
||||||
@@ -2583,16 +2356,6 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
expectedEntityIDs(t, out, []string{id1})
|
|
||||||
|
|
||||||
path = fmt.Sprintf("%ventity/%v/0", ActivityLogLocalPrefix, segment0)
|
|
||||||
protoSegment = readSegmentFromStorage(t, core, path)
|
|
||||||
out = &activity.EntityActivityLog{}
|
|
||||||
err = proto.Unmarshal(protoSegment.Value, out)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
expectedEntityIDs(t, out, []string{id4})
|
|
||||||
|
|
||||||
segment1 := a.GetStartTimestamp()
|
segment1 := a.GetStartTimestamp()
|
||||||
expectedTimestamp := timeutil.StartOfMonth(month1).Unix()
|
expectedTimestamp := timeutil.StartOfMonth(month1).Unix()
|
||||||
@@ -2637,21 +2400,16 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||||||
|
|
||||||
// Check all three segments still present, with correct entities
|
// Check all three segments still present, with correct entities
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
SegmentTimestamp int64
|
SegmentTimestamp int64
|
||||||
ExpectedGlobalEntityIDs []string
|
ExpectedEntityIDs []string
|
||||||
ExpectedLocalEntityIDs []string
|
|
||||||
}{
|
}{
|
||||||
{segment0, []string{id1}, []string{id4}},
|
{segment0, []string{id1}},
|
||||||
{segment1, []string{id2}, []string{}},
|
{segment1, []string{id2}},
|
||||||
{segment2, []string{id3}, []string{}},
|
{segment2, []string{id3}},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tc := range testCases {
|
for i, tc := range testCases {
|
||||||
t.Logf("checking segment %v timestamp %v", i, tc.SegmentTimestamp)
|
t.Logf("checking segment %v timestamp %v", i, tc.SegmentTimestamp)
|
||||||
|
|
||||||
expectedAllEntities := make([]string, 0)
|
|
||||||
expectedAllEntities = append(expectedAllEntities, tc.ExpectedGlobalEntityIDs...)
|
|
||||||
expectedAllEntities = append(expectedAllEntities, tc.ExpectedLocalEntityIDs...)
|
|
||||||
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, tc.SegmentTimestamp)
|
path := fmt.Sprintf("%ventity/%v/0", ActivityLogPrefix, tc.SegmentTimestamp)
|
||||||
protoSegment := readSegmentFromStorage(t, core, path)
|
protoSegment := readSegmentFromStorage(t, core, path)
|
||||||
out := &activity.EntityActivityLog{}
|
out := &activity.EntityActivityLog{}
|
||||||
@@ -2659,7 +2417,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not unmarshal protobuf: %v", err)
|
t.Fatalf("could not unmarshal protobuf: %v", err)
|
||||||
}
|
}
|
||||||
expectedEntityIDs(t, out, expectedAllEntities)
|
expectedEntityIDs(t, out, tc.ExpectedEntityIDs)
|
||||||
|
|
||||||
// Check for global entities at global storage path
|
// Check for global entities at global storage path
|
||||||
path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, tc.SegmentTimestamp)
|
path = fmt.Sprintf("%ventity/%v/0", ActivityGlobalLogPrefix, tc.SegmentTimestamp)
|
||||||
@@ -2669,19 +2427,7 @@ func TestActivityLog_EndOfMonth(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not unmarshal protobuf: %v", err)
|
t.Fatalf("could not unmarshal protobuf: %v", err)
|
||||||
}
|
}
|
||||||
expectedEntityIDs(t, out, tc.ExpectedGlobalEntityIDs)
|
expectedEntityIDs(t, out, tc.ExpectedEntityIDs)
|
||||||
|
|
||||||
// Check for local entities at local storage path
|
|
||||||
if len(tc.ExpectedLocalEntityIDs) > 0 {
|
|
||||||
path = fmt.Sprintf("%ventity/%v/0", ActivityLogLocalPrefix, tc.SegmentTimestamp)
|
|
||||||
protoSegment = readSegmentFromStorage(t, core, path)
|
|
||||||
out = &activity.EntityActivityLog{}
|
|
||||||
err = proto.Unmarshal(protoSegment.Value, out)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("could not unmarshal protobuf: %v", err)
|
|
||||||
}
|
|
||||||
expectedEntityIDs(t, out, tc.ExpectedLocalEntityIDs)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5814,81 +5560,6 @@ func TestCreateSegment_StoreSegment(t *testing.T) {
|
|||||||
maxClientsPerFragment: ActivitySegmentClientCapacity,
|
maxClientsPerFragment: ActivitySegmentClientCapacity,
|
||||||
global: false,
|
global: false,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
testName: "[local] max client size, drop clients",
|
|
||||||
numClients: ActivitySegmentClientCapacity*2 + 1,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity,
|
|
||||||
global: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local, no-force] max client size, drop clients",
|
|
||||||
numClients: ActivitySegmentClientCapacity*2 + 1,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity,
|
|
||||||
global: false,
|
|
||||||
forceStore: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local] max segment size",
|
|
||||||
numClients: ActivitySegmentClientCapacity,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity,
|
|
||||||
global: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local, no-force] max segment size",
|
|
||||||
numClients: ActivitySegmentClientCapacity,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity,
|
|
||||||
global: false,
|
|
||||||
forceStore: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local] max segment size, multiple fragments",
|
|
||||||
numClients: ActivitySegmentClientCapacity,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity - 1,
|
|
||||||
global: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local, no-force] max segment size, multiple fragments",
|
|
||||||
numClients: ActivitySegmentClientCapacity,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity - 1,
|
|
||||||
global: false,
|
|
||||||
forceStore: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local] roll over",
|
|
||||||
numClients: ActivitySegmentClientCapacity + 2,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity,
|
|
||||||
global: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local, no-force] roll over",
|
|
||||||
numClients: ActivitySegmentClientCapacity + 2,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity,
|
|
||||||
global: false,
|
|
||||||
forceStore: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local] max segment size, rollover multiple fragments",
|
|
||||||
numClients: ActivitySegmentClientCapacity * 2,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity - 1,
|
|
||||||
global: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
testName: "[local, no-force] max segment size, rollover multiple fragments",
|
|
||||||
numClients: ActivitySegmentClientCapacity * 2,
|
|
||||||
pathPrefix: activityLocalPathPrefix,
|
|
||||||
maxClientsPerFragment: ActivitySegmentClientCapacity - 1,
|
|
||||||
global: false,
|
|
||||||
forceStore: true,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
@@ -5911,9 +5582,6 @@ func TestCreateSegment_StoreSegment(t *testing.T) {
|
|||||||
segment := &a.currentGlobalSegment
|
segment := &a.currentGlobalSegment
|
||||||
if !test.global {
|
if !test.global {
|
||||||
segment = &a.currentSegment
|
segment = &a.currentSegment
|
||||||
if test.pathPrefix == activityLocalPathPrefix {
|
|
||||||
segment = &a.currentLocalSegment
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create segments and write to storage
|
// Create segments and write to storage
|
||||||
@@ -5932,24 +5600,13 @@ func TestCreateSegment_StoreSegment(t *testing.T) {
|
|||||||
clientTotal += len(entity.GetClients())
|
clientTotal += len(entity.GetClients())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if test.pathPrefix == activityLocalPathPrefix {
|
for {
|
||||||
for {
|
entity, err := reader.ReadEntity(ctx)
|
||||||
entity, err := reader.ReadLocalEntity(ctx)
|
if errors.Is(err, io.EOF) {
|
||||||
if errors.Is(err, io.EOF) {
|
break
|
||||||
break
|
|
||||||
}
|
|
||||||
require.NoError(t, err)
|
|
||||||
clientTotal += len(entity.GetClients())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for {
|
|
||||||
entity, err := reader.ReadEntity(ctx)
|
|
||||||
if errors.Is(err, io.EOF) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
require.NoError(t, err)
|
|
||||||
clientTotal += len(entity.GetClients())
|
|
||||||
}
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
clientTotal += len(entity.GetClients())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -93,28 +93,6 @@ func (c *Core) GetActiveClientsList() []*activity.EntityRecord {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) GetActiveGlobalClientsList() []*activity.EntityRecord {
|
|
||||||
out := []*activity.EntityRecord{}
|
|
||||||
c.activityLog.globalFragmentLock.RLock()
|
|
||||||
// add active global clients
|
|
||||||
for _, v := range c.activityLog.globalPartialMonthClientTracker {
|
|
||||||
out = append(out, v)
|
|
||||||
}
|
|
||||||
c.activityLog.globalFragmentLock.RUnlock()
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Core) GetActiveLocalClientsList() []*activity.EntityRecord {
|
|
||||||
out := []*activity.EntityRecord{}
|
|
||||||
c.activityLog.localFragmentLock.RLock()
|
|
||||||
// add active global clients
|
|
||||||
for _, v := range c.activityLog.partialMonthLocalClientTracker {
|
|
||||||
out = append(out, v)
|
|
||||||
}
|
|
||||||
c.activityLog.localFragmentLock.RUnlock()
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetCurrentEntities returns the current entity activity log
|
// GetCurrentEntities returns the current entity activity log
|
||||||
func (a *ActivityLog) GetCurrentEntities() *activity.EntityActivityLog {
|
func (a *ActivityLog) GetCurrentEntities() *activity.EntityActivityLog {
|
||||||
a.l.RLock()
|
a.l.RLock()
|
||||||
@@ -126,16 +104,11 @@ func (a *ActivityLog) GetCurrentEntities() *activity.EntityActivityLog {
|
|||||||
func (a *ActivityLog) GetCurrentGlobalEntities() *activity.EntityActivityLog {
|
func (a *ActivityLog) GetCurrentGlobalEntities() *activity.EntityActivityLog {
|
||||||
a.l.RLock()
|
a.l.RLock()
|
||||||
defer a.l.RUnlock()
|
defer a.l.RUnlock()
|
||||||
|
a.globalFragmentLock.RLock()
|
||||||
|
defer a.globalFragmentLock.RUnlock()
|
||||||
return a.currentGlobalSegment.currentClients
|
return a.currentGlobalSegment.currentClients
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetCurrentLocalEntities returns the current local entity activity log
|
|
||||||
func (a *ActivityLog) GetCurrentLocalEntities() *activity.EntityActivityLog {
|
|
||||||
a.l.RLock()
|
|
||||||
defer a.l.RUnlock()
|
|
||||||
return a.currentLocalSegment.currentClients
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteToStorage is used to put entity data in storage
|
// WriteToStorage is used to put entity data in storage
|
||||||
// `path` should be the complete path (not relative to the view)
|
// `path` should be the complete path (not relative to the view)
|
||||||
func WriteToStorage(t *testing.T, c *Core, path string, data []byte) {
|
func WriteToStorage(t *testing.T, c *Core, path string, data []byte) {
|
||||||
@@ -217,9 +190,6 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart
|
|||||||
if a.partialMonthLocalClientTracker == nil {
|
if a.partialMonthLocalClientTracker == nil {
|
||||||
t.Errorf("expected non-nil partialMonthLocalClientTracker")
|
t.Errorf("expected non-nil partialMonthLocalClientTracker")
|
||||||
}
|
}
|
||||||
if a.globalPartialMonthClientTracker == nil {
|
|
||||||
t.Errorf("expected non-nil globalPartialMonthClientTracker")
|
|
||||||
}
|
|
||||||
if len(a.currentSegment.currentClients.Clients) > 0 {
|
if len(a.currentSegment.currentClients.Clients) > 0 {
|
||||||
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentClients)
|
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentClients)
|
||||||
}
|
}
|
||||||
@@ -232,26 +202,13 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart
|
|||||||
if len(a.partialMonthLocalClientTracker) > 0 {
|
if len(a.partialMonthLocalClientTracker) > 0 {
|
||||||
t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthLocalClientTracker)
|
t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthLocalClientTracker)
|
||||||
}
|
}
|
||||||
if len(a.globalPartialMonthClientTracker) > 0 {
|
|
||||||
t.Errorf("expected no active entity segment to be loaded. got: %v", a.globalPartialMonthClientTracker)
|
|
||||||
}
|
|
||||||
|
|
||||||
if verifyTimeNotZero {
|
if verifyTimeNotZero {
|
||||||
if a.currentSegment.startTimestamp == 0 {
|
if a.currentSegment.startTimestamp == 0 {
|
||||||
t.Error("bad start timestamp. expected no reset but timestamp was reset")
|
t.Error("bad start timestamp. expected no reset but timestamp was reset")
|
||||||
}
|
}
|
||||||
if a.currentGlobalSegment.startTimestamp == 0 {
|
|
||||||
t.Error("bad start timestamp. expected no reset but timestamp was reset")
|
|
||||||
}
|
|
||||||
if a.currentLocalSegment.startTimestamp == 0 {
|
|
||||||
t.Error("bad start timestamp. expected no reset but timestamp was reset")
|
|
||||||
}
|
|
||||||
} else if a.currentSegment.startTimestamp != expectedStart {
|
} else if a.currentSegment.startTimestamp != expectedStart {
|
||||||
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentSegment.startTimestamp)
|
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentSegment.startTimestamp)
|
||||||
} else if a.currentGlobalSegment.startTimestamp != expectedStart {
|
|
||||||
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentGlobalSegment.startTimestamp)
|
|
||||||
} else if a.currentLocalSegment.startTimestamp != expectedStart {
|
|
||||||
t.Errorf("bad start timestamp. expected: %v got: %v", expectedStart, a.currentLocalSegment.startTimestamp)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -284,14 +241,13 @@ func (a *ActivityLog) SetStartTimestamp(timestamp int64) {
|
|||||||
defer a.l.Unlock()
|
defer a.l.Unlock()
|
||||||
a.currentSegment.startTimestamp = timestamp
|
a.currentSegment.startTimestamp = timestamp
|
||||||
a.currentGlobalSegment.startTimestamp = timestamp
|
a.currentGlobalSegment.startTimestamp = timestamp
|
||||||
a.currentLocalSegment.startTimestamp = timestamp
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStoredTokenCountByNamespaceID returns the count of tokens by namespace ID
|
// GetStoredTokenCountByNamespaceID returns the count of tokens by namespace ID
|
||||||
func (a *ActivityLog) GetStoredTokenCountByNamespaceID() map[string]uint64 {
|
func (a *ActivityLog) GetStoredTokenCountByNamespaceID() map[string]uint64 {
|
||||||
a.l.RLock()
|
a.l.RLock()
|
||||||
defer a.l.RUnlock()
|
defer a.l.RUnlock()
|
||||||
return a.currentLocalSegment.tokenCount.CountByNamespaceID
|
return a.currentSegment.tokenCount.CountByNamespaceID
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEntitySequenceNumber returns the current entity sequence number
|
// GetEntitySequenceNumber returns the current entity sequence number
|
||||||
@@ -301,20 +257,6 @@ func (a *ActivityLog) GetEntitySequenceNumber() uint64 {
|
|||||||
return a.currentSegment.clientSequenceNumber
|
return a.currentSegment.clientSequenceNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetGlobalEntitySequenceNumber returns the current entity sequence number
|
|
||||||
func (a *ActivityLog) GetGlobalEntitySequenceNumber() uint64 {
|
|
||||||
a.l.RLock()
|
|
||||||
defer a.l.RUnlock()
|
|
||||||
return a.currentGlobalSegment.clientSequenceNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLocalEntitySequenceNumber returns the current entity sequence number
|
|
||||||
func (a *ActivityLog) GetLocalEntitySequenceNumber() uint64 {
|
|
||||||
a.l.RLock()
|
|
||||||
defer a.l.RUnlock()
|
|
||||||
return a.currentLocalSegment.clientSequenceNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetEnable sets the enabled flag on the activity log
|
// SetEnable sets the enabled flag on the activity log
|
||||||
func (a *ActivityLog) SetEnable(enabled bool) {
|
func (a *ActivityLog) SetEnable(enabled bool) {
|
||||||
a.l.Lock()
|
a.l.Lock()
|
||||||
|
|||||||
@@ -427,7 +427,6 @@ type segmentReader struct {
|
|||||||
tokens *singleTypeSegmentReader
|
tokens *singleTypeSegmentReader
|
||||||
entities *singleTypeSegmentReader
|
entities *singleTypeSegmentReader
|
||||||
globalEntities *singleTypeSegmentReader
|
globalEntities *singleTypeSegmentReader
|
||||||
localEntities *singleTypeSegmentReader
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SegmentReader is an interface that provides methods to read tokens and entities in order
|
// SegmentReader is an interface that provides methods to read tokens and entities in order
|
||||||
@@ -435,7 +434,6 @@ type SegmentReader interface {
|
|||||||
ReadToken(ctx context.Context) (*activity.TokenCount, error)
|
ReadToken(ctx context.Context) (*activity.TokenCount, error)
|
||||||
ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error)
|
ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error)
|
||||||
ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error)
|
ReadGlobalEntity(ctx context.Context) (*activity.EntityActivityLog, error)
|
||||||
ReadLocalEntity(ctx context.Context) (*activity.EntityActivityLog, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.Time) (SegmentReader, error) {
|
func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.Time) (SegmentReader, error) {
|
||||||
@@ -447,15 +445,11 @@ func (a *ActivityLog) NewSegmentFileReader(ctx context.Context, startTime time.T
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
localEntities, err := a.newSingleTypeSegmentReader(ctx, startTime, activityLocalPathPrefix+activityEntityBasePath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenLocalBasePath)
|
tokens, err := a.newSingleTypeSegmentReader(ctx, startTime, activityTokenLocalBasePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &segmentReader{entities: entities, globalEntities: globalEntities, localEntities: localEntities, tokens: tokens}, nil
|
return &segmentReader{entities: entities, globalEntities: globalEntities, tokens: tokens}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActivityLog) newSingleTypeSegmentReader(ctx context.Context, startTime time.Time, prefix string) (*singleTypeSegmentReader, error) {
|
func (a *ActivityLog) newSingleTypeSegmentReader(ctx context.Context, startTime time.Time, prefix string) (*singleTypeSegmentReader, error) {
|
||||||
@@ -532,17 +526,6 @@ func (e *segmentReader) ReadGlobalEntity(ctx context.Context) (*activity.EntityA
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadLocalEntity reads a local entity from the local segment
|
|
||||||
// If there is none available, then the error will be io.EOF
|
|
||||||
func (e *segmentReader) ReadLocalEntity(ctx context.Context) (*activity.EntityActivityLog, error) {
|
|
||||||
out := &activity.EntityActivityLog{}
|
|
||||||
err := e.localEntities.nextValue(ctx, out)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// namespaceRecordToCountsResponse converts the record to the ResponseCounts
|
// namespaceRecordToCountsResponse converts the record to the ResponseCounts
|
||||||
// type. The function sums entity, non-entity, and secret sync counts to get the
|
// type. The function sums entity, non-entity, and secret sync counts to get the
|
||||||
// total client count.
|
// total client count.
|
||||||
|
|||||||
@@ -998,14 +998,6 @@ func writeGlobalEntitySegment(t *testing.T, core *Core, ts time.Time, index int,
|
|||||||
WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, ts, index), protoItem)
|
WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, ts, index), protoItem)
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeLocalEntitySegment writes a single local segment file with the given time and index for an entity
|
|
||||||
func writeLocalEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) {
|
|
||||||
t.Helper()
|
|
||||||
protoItem, err := proto.Marshal(item)
|
|
||||||
require.NoError(t, err)
|
|
||||||
WriteToStorage(t, core, makeSegmentPath(t, activityLocalPathPrefix+activityEntityBasePath, ts, index), protoItem)
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeEntitySegment writes a single segment file with the given time and index for an entity
|
// writeEntitySegment writes a single segment file with the given time and index for an entity
|
||||||
func writeEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) {
|
func writeEntitySegment(t *testing.T, core *Core, ts time.Time, index int, item *activity.EntityActivityLog) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
@@ -1039,7 +1031,6 @@ func TestSegmentFileReader_BadData(t *testing.T) {
|
|||||||
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, 0), []byte("fake data"))
|
WriteToStorage(t, core, makeSegmentPath(t, activityTokenLocalBasePath, now, 0), []byte("fake data"))
|
||||||
WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data"))
|
WriteToStorage(t, core, makeSegmentPath(t, activityEntityBasePath, now, 0), []byte("fake data"))
|
||||||
WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, 0), []byte("fake data"))
|
WriteToStorage(t, core, makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, 0), []byte("fake data"))
|
||||||
WriteToStorage(t, core, makeSegmentPath(t, activityLocalPathPrefix+activityEntityBasePath, now, 0), []byte("fake data"))
|
|
||||||
|
|
||||||
// write entity at index 1
|
// write entity at index 1
|
||||||
entity := &activity.EntityActivityLog{Clients: []*activity.EntityRecord{
|
entity := &activity.EntityActivityLog{Clients: []*activity.EntityRecord{
|
||||||
@@ -1052,9 +1043,6 @@ func TestSegmentFileReader_BadData(t *testing.T) {
|
|||||||
// write global data at index 1
|
// write global data at index 1
|
||||||
writeGlobalEntitySegment(t, core, now, 1, entity)
|
writeGlobalEntitySegment(t, core, now, 1, entity)
|
||||||
|
|
||||||
// write local data at index 1
|
|
||||||
writeLocalEntitySegment(t, core, now, 1, entity)
|
|
||||||
|
|
||||||
// write token at index 1
|
// write token at index 1
|
||||||
token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{
|
token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{
|
||||||
"ns": 1,
|
"ns": 1,
|
||||||
@@ -1079,14 +1067,6 @@ func TestSegmentFileReader_BadData(t *testing.T) {
|
|||||||
require.True(t, proto.Equal(gotEntity, entity))
|
require.True(t, proto.Equal(gotEntity, entity))
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
// first the bad local entity is read, which returns an error
|
|
||||||
_, err = reader.ReadLocalEntity(context.Background())
|
|
||||||
require.Error(t, err)
|
|
||||||
// then, the reader can read the good entity at index 1
|
|
||||||
gotEntity, err = reader.ReadLocalEntity(context.Background())
|
|
||||||
require.True(t, proto.Equal(gotEntity, entity))
|
|
||||||
require.Nil(t, err)
|
|
||||||
|
|
||||||
// the bad token causes an error
|
// the bad token causes an error
|
||||||
_, err = reader.ReadToken(context.Background())
|
_, err = reader.ReadToken(context.Background())
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
@@ -1115,13 +1095,8 @@ func TestSegmentFileReader_MissingData(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}}
|
}}
|
||||||
writeEntitySegment(t, core, now, 3, entity)
|
writeEntitySegment(t, core, now, 3, entity)
|
||||||
|
|
||||||
// write global entity at index 3
|
// write global entity at index 3
|
||||||
writeGlobalEntitySegment(t, core, now, 3, entity)
|
writeGlobalEntitySegment(t, core, now, 3, entity)
|
||||||
|
|
||||||
// write local entity at index 3
|
|
||||||
writeLocalEntitySegment(t, core, now, 3, entity)
|
|
||||||
|
|
||||||
// write token at index 3
|
// write token at index 3
|
||||||
token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{
|
token := &activity.TokenCount{CountByNamespaceID: map[string]uint64{
|
||||||
"ns": 1,
|
"ns": 1,
|
||||||
@@ -1135,7 +1110,6 @@ func TestSegmentFileReader_MissingData(t *testing.T) {
|
|||||||
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenLocalBasePath, now, i)))
|
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityTokenLocalBasePath, now, i)))
|
||||||
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i)))
|
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityEntityBasePath, now, i)))
|
||||||
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, i)))
|
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityGlobalPathPrefix+activityEntityBasePath, now, i)))
|
||||||
require.NoError(t, core.barrier.Delete(context.Background(), makeSegmentPath(t, activityLocalPathPrefix+activityEntityBasePath, now, i)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// we expect the reader to only return the data at index 3, and then be done
|
// we expect the reader to only return the data at index 3, and then be done
|
||||||
@@ -1156,12 +1130,6 @@ func TestSegmentFileReader_MissingData(t *testing.T) {
|
|||||||
require.True(t, proto.Equal(gotEntity, entity))
|
require.True(t, proto.Equal(gotEntity, entity))
|
||||||
_, err = reader.ReadGlobalEntity(context.Background())
|
_, err = reader.ReadGlobalEntity(context.Background())
|
||||||
require.Equal(t, err, io.EOF)
|
require.Equal(t, err, io.EOF)
|
||||||
|
|
||||||
gotEntity, err = reader.ReadLocalEntity(context.Background())
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.True(t, proto.Equal(gotEntity, entity))
|
|
||||||
_, err = reader.ReadLocalEntity(context.Background())
|
|
||||||
require.Equal(t, err, io.EOF)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSegmentFileReader_NoData verifies that the reader return io.EOF when there is no data
|
// TestSegmentFileReader_NoData verifies that the reader return io.EOF when there is no data
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ func TestACMERegeneration_RegenerateWithCurrentMonth(t *testing.T) {
|
|||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(3).
|
NewPreviousMonthData(3).
|
||||||
// 3 months ago, 15 non-entity clients and 10 ACME clients
|
// 3 months ago, 15 non-entity clients and 10 ACME clients
|
||||||
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
||||||
@@ -116,7 +116,7 @@ func TestACMERegeneration_RegenerateMuchOlder(t *testing.T) {
|
|||||||
client := cluster.Cores[0].Client
|
client := cluster.Cores[0].Client
|
||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
_, _, _, err := clientcountutil.NewActivityLogData(client).
|
_, _, err := clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(5).
|
NewPreviousMonthData(5).
|
||||||
// 5 months ago, 15 non-entity clients and 10 ACME clients
|
// 5 months ago, 15 non-entity clients and 10 ACME clients
|
||||||
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
||||||
@@ -159,7 +159,7 @@ func TestACMERegeneration_RegeneratePreviousMonths(t *testing.T) {
|
|||||||
client := cluster.Cores[0].Client
|
client := cluster.Cores[0].Client
|
||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
_, _, _, err := clientcountutil.NewActivityLogData(client).
|
_, _, err := clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(3).
|
NewPreviousMonthData(3).
|
||||||
// 3 months ago, 15 non-entity clients and 10 ACME clients
|
// 3 months ago, 15 non-entity clients and 10 ACME clients
|
||||||
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
NewClientsSeen(15, clientcountutil.WithClientType("non-entity-token")).
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ func Test_ActivityLog_Disable(t *testing.T) {
|
|||||||
"enabled": "enable",
|
"enabled": "enable",
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(1).
|
NewPreviousMonthData(1).
|
||||||
NewClientsSeen(5).
|
NewClientsSeen(5).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ func Test_ActivityLog_LoseLeadership(t *testing.T) {
|
|||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
NewClientsSeen(10).
|
NewClientsSeen(10).
|
||||||
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
||||||
@@ -121,7 +121,7 @@ func Test_ActivityLog_ClientsOverlapping(t *testing.T) {
|
|||||||
"enabled": "enable",
|
"enabled": "enable",
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(1).
|
NewPreviousMonthData(1).
|
||||||
NewClientsSeen(7).
|
NewClientsSeen(7).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
@@ -169,7 +169,7 @@ func Test_ActivityLog_ClientsNewCurrentMonth(t *testing.T) {
|
|||||||
"enabled": "enable",
|
"enabled": "enable",
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(1).
|
NewPreviousMonthData(1).
|
||||||
NewClientsSeen(5).
|
NewClientsSeen(5).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
@@ -203,7 +203,7 @@ func Test_ActivityLog_EmptyDataMonths(t *testing.T) {
|
|||||||
"enabled": "enable",
|
"enabled": "enable",
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
NewClientsSeen(10).
|
NewClientsSeen(10).
|
||||||
Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES)
|
Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES)
|
||||||
@@ -243,7 +243,7 @@ func Test_ActivityLog_FutureEndDate(t *testing.T) {
|
|||||||
"enabled": "enable",
|
"enabled": "enable",
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(1).
|
NewPreviousMonthData(1).
|
||||||
NewClientsSeen(10).
|
NewClientsSeen(10).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
@@ -316,7 +316,7 @@ func Test_ActivityLog_ClientTypeResponse(t *testing.T) {
|
|||||||
_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{
|
_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{
|
||||||
"enabled": "enable",
|
"enabled": "enable",
|
||||||
})
|
})
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)).
|
NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)).
|
||||||
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
||||||
@@ -369,7 +369,7 @@ func Test_ActivityLogCurrentMonth_Response(t *testing.T) {
|
|||||||
_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{
|
_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{
|
||||||
"enabled": "enable",
|
"enabled": "enable",
|
||||||
})
|
})
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)).
|
NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)).
|
||||||
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
||||||
@@ -420,7 +420,7 @@ func Test_ActivityLog_Deduplication(t *testing.T) {
|
|||||||
_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{
|
_, err := client.Logical().Write("sys/internal/counters/config", map[string]interface{}{
|
||||||
"enabled": "enable",
|
"enabled": "enable",
|
||||||
})
|
})
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(3).
|
NewPreviousMonthData(3).
|
||||||
NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)).
|
NewClientsSeen(10, clientcountutil.WithClientType(tc.clientType)).
|
||||||
NewPreviousMonthData(2).
|
NewPreviousMonthData(2).
|
||||||
@@ -462,7 +462,7 @@ func Test_ActivityLog_MountDeduplication(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
_, localPaths, globalPaths, err := clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewPreviousMonthData(1).
|
NewPreviousMonthData(1).
|
||||||
NewClientSeen(clientcountutil.WithClientMount("sys")).
|
NewClientSeen(clientcountutil.WithClientMount("sys")).
|
||||||
NewClientSeen(clientcountutil.WithClientMount("secret")).
|
NewClientSeen(clientcountutil.WithClientMount("secret")).
|
||||||
@@ -473,10 +473,6 @@ func Test_ActivityLog_MountDeduplication(t *testing.T) {
|
|||||||
NewClientSeen(clientcountutil.WithClientMount("sys")).
|
NewClientSeen(clientcountutil.WithClientMount("sys")).
|
||||||
Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES)
|
Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// cubbyhole is local, 2 segments must exist for 2 months seen
|
|
||||||
require.Len(t, localPaths, 2)
|
|
||||||
// 2 global segments must exist for 2 months seen
|
|
||||||
require.Len(t, globalPaths, 2)
|
|
||||||
|
|
||||||
resp, err := client.Logical().ReadWithData("sys/internal/counters/activity", map[string][]string{
|
resp, err := client.Logical().ReadWithData("sys/internal/counters/activity", map[string][]string{
|
||||||
"end_time": {timeutil.EndOfMonth(now).Format(time.RFC3339)},
|
"end_time": {timeutil.EndOfMonth(now).Format(time.RFC3339)},
|
||||||
@@ -673,7 +669,7 @@ func Test_ActivityLog_Export_Sudo(t *testing.T) {
|
|||||||
|
|
||||||
rootToken := client.Token()
|
rootToken := client.Token()
|
||||||
|
|
||||||
_, _, _, err = clientcountutil.NewActivityLogData(client).
|
_, _, err = clientcountutil.NewActivityLogData(client).
|
||||||
NewCurrentMonthData().
|
NewCurrentMonthData().
|
||||||
NewClientsSeen(10).
|
NewClientsSeen(10).
|
||||||
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
Write(context.Background(), generation.WriteOptions_WRITE_ENTITIES)
|
||||||
@@ -849,7 +845,7 @@ func TestHandleQuery_MultipleMounts(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write all the client count data
|
// Write all the client count data
|
||||||
_, _, _, err = activityLogGenerator.Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES)
|
_, _, err = activityLogGenerator.Write(context.Background(), generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES, generation.WriteOptions_WRITE_ENTITIES)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
endOfCurrentMonth := timeutil.EndOfMonth(time.Now().UTC())
|
endOfCurrentMonth := timeutil.EndOfMonth(time.Now().UTC())
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo
|
|||||||
for _, opt := range input.Write {
|
for _, opt := range input.Write {
|
||||||
opts[opt] = struct{}{}
|
opts[opt] = struct{}{}
|
||||||
}
|
}
|
||||||
paths, localPaths, globalPaths, err := generated.write(ctx, opts, b.Core.activityLog, now)
|
paths, globalPaths, err := generated.write(ctx, opts, b.Core.activityLog, now)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.logger.Debug("failed to write activity log data", "error", err.Error())
|
b.logger.Debug("failed to write activity log data", "error", err.Error())
|
||||||
return logical.ErrorResponse("failed to write data"), err
|
return logical.ErrorResponse("failed to write data"), err
|
||||||
@@ -93,7 +93,6 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo
|
|||||||
return &logical.Response{
|
return &logical.Response{
|
||||||
Data: map[string]interface{}{
|
Data: map[string]interface{}{
|
||||||
"paths": paths,
|
"paths": paths,
|
||||||
"local_paths": localPaths,
|
|
||||||
"global_paths": globalPaths,
|
"global_paths": globalPaths,
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
@@ -105,17 +104,12 @@ type singleMonthActivityClients struct {
|
|||||||
clients []*activity.EntityRecord
|
clients []*activity.EntityRecord
|
||||||
// globalClients are indexed by ID
|
// globalClients are indexed by ID
|
||||||
globalClients []*activity.EntityRecord
|
globalClients []*activity.EntityRecord
|
||||||
// localClients are indexed by ID
|
|
||||||
localClients []*activity.EntityRecord
|
|
||||||
// predefinedSegments map from the segment number to the client's index in
|
// predefinedSegments map from the segment number to the client's index in
|
||||||
// the clients slice
|
// the clients slice
|
||||||
predefinedSegments map[int][]int
|
predefinedSegments map[int][]int
|
||||||
// predefinedGlobalSegments map from the segment number to the client's index in
|
// predefinedGlobalSegments map from the segment number to the client's index in
|
||||||
// the clients slice
|
// the clients slice
|
||||||
predefinedGlobalSegments map[int][]int
|
predefinedGlobalSegments map[int][]int
|
||||||
// predefinedLocalSegments map from the segment number to the client's index in
|
|
||||||
// the clients slice
|
|
||||||
predefinedLocalSegments map[int][]int
|
|
||||||
// generationParameters holds the generation request
|
// generationParameters holds the generation request
|
||||||
generationParameters *generation.Data
|
generationParameters *generation.Data
|
||||||
}
|
}
|
||||||
@@ -131,8 +125,6 @@ func (s *singleMonthActivityClients) addEntityRecord(core *Core, record *activit
|
|||||||
local, _ := core.activityLog.isClientLocal(record)
|
local, _ := core.activityLog.isClientLocal(record)
|
||||||
if !local {
|
if !local {
|
||||||
s.globalClients = append(s.globalClients, record)
|
s.globalClients = append(s.globalClients, record)
|
||||||
} else {
|
|
||||||
s.localClients = append(s.localClients, record)
|
|
||||||
}
|
}
|
||||||
if segmentIndex != nil {
|
if segmentIndex != nil {
|
||||||
index := len(s.clients) - 1
|
index := len(s.clients) - 1
|
||||||
@@ -140,9 +132,6 @@ func (s *singleMonthActivityClients) addEntityRecord(core *Core, record *activit
|
|||||||
if !local {
|
if !local {
|
||||||
globalIndex := len(s.globalClients) - 1
|
globalIndex := len(s.globalClients) - 1
|
||||||
s.predefinedGlobalSegments[*segmentIndex] = append(s.predefinedGlobalSegments[*segmentIndex], globalIndex)
|
s.predefinedGlobalSegments[*segmentIndex] = append(s.predefinedGlobalSegments[*segmentIndex], globalIndex)
|
||||||
} else {
|
|
||||||
localIndex := len(s.localClients) - 1
|
|
||||||
s.predefinedLocalSegments[*segmentIndex] = append(s.predefinedLocalSegments[*segmentIndex], localIndex)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -395,10 +384,9 @@ func (m *multipleMonthsActivityClients) timestampForMonth(i int, now time.Time)
|
|||||||
return now
|
return now
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog, now time.Time) ([]string, []string, []string, error) {
|
func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog, now time.Time) ([]string, []string, error) {
|
||||||
paths := []string{}
|
paths := []string{}
|
||||||
globalPaths := []string{}
|
globalPaths := []string{}
|
||||||
localPaths := []string{}
|
|
||||||
|
|
||||||
_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES]
|
_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES]
|
||||||
_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS]
|
_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS]
|
||||||
@@ -413,7 +401,7 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
|
|||||||
timestamp := m.timestampForMonth(i, now)
|
timestamp := m.timestampForMonth(i, now)
|
||||||
segments, err := month.populateSegments(month.predefinedSegments, month.clients)
|
segments, err := month.populateSegments(month.predefinedSegments, month.clients)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
for segmentIndex, segment := range segments {
|
for segmentIndex, segment := range segments {
|
||||||
if segment == nil {
|
if segment == nil {
|
||||||
@@ -427,14 +415,14 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
|
|||||||
tokenCount: &activity.TokenCount{},
|
tokenCount: &activity.TokenCount{},
|
||||||
}, true, "")
|
}, true, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
paths = append(paths, entityPath)
|
paths = append(paths, entityPath)
|
||||||
}
|
}
|
||||||
if len(month.globalClients) > 0 {
|
if len(month.globalClients) > 0 {
|
||||||
globalSegments, err := month.populateSegments(month.predefinedGlobalSegments, month.globalClients)
|
globalSegments, err := month.populateSegments(month.predefinedGlobalSegments, month.globalClients)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
for segmentIndex, segment := range globalSegments {
|
for segmentIndex, segment := range globalSegments {
|
||||||
if segment == nil {
|
if segment == nil {
|
||||||
@@ -448,33 +436,11 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
|
|||||||
tokenCount: &activity.TokenCount{},
|
tokenCount: &activity.TokenCount{},
|
||||||
}, true, activityGlobalPathPrefix)
|
}, true, activityGlobalPathPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
globalPaths = append(globalPaths, entityPath)
|
globalPaths = append(globalPaths, entityPath)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(month.localClients) > 0 {
|
|
||||||
localSegments, err := month.populateSegments(month.predefinedLocalSegments, month.localClients)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, nil, err
|
|
||||||
}
|
|
||||||
for segmentIndex, segment := range localSegments {
|
|
||||||
if segment == nil {
|
|
||||||
// skip the index
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{
|
|
||||||
startTimestamp: timestamp.Unix(),
|
|
||||||
currentClients: &activity.EntityActivityLog{Clients: segment},
|
|
||||||
clientSequenceNumber: uint64(segmentIndex),
|
|
||||||
tokenCount: &activity.TokenCount{},
|
|
||||||
}, true, activityLocalPathPrefix)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, nil, err
|
|
||||||
}
|
|
||||||
localPaths = append(localPaths, entityPath)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if writePQ || writeDistinctClients {
|
if writePQ || writeDistinctClients {
|
||||||
// start with the oldest month of data, and create precomputed queries
|
// start with the oldest month of data, and create precomputed queries
|
||||||
@@ -495,16 +461,16 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
|
|||||||
if writeIntentLog {
|
if writeIntentLog {
|
||||||
err := activityLog.writeIntentLog(ctx, m.latestTimestamp(now, false).Unix(), m.latestTimestamp(now, true).UTC())
|
err := activityLog.writeIntentLog(ctx, m.latestTimestamp(now, false).Unix(), m.latestTimestamp(now, true).UTC())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
err := activityLog.refreshFromStoredLog(ctx, &wg, now)
|
err := activityLog.refreshFromStoredLog(ctx, &wg, now)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return paths, localPaths, globalPaths, nil
|
return paths, globalPaths, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time, includeCurrentMonth bool) time.Time {
|
func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time, includeCurrentMonth bool) time.Time {
|
||||||
@@ -534,7 +500,6 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit
|
|||||||
m.months[i] = &singleMonthActivityClients{
|
m.months[i] = &singleMonthActivityClients{
|
||||||
predefinedSegments: make(map[int][]int),
|
predefinedSegments: make(map[int][]int),
|
||||||
predefinedGlobalSegments: make(map[int][]int),
|
predefinedGlobalSegments: make(map[int][]int),
|
||||||
predefinedLocalSegments: make(map[int][]int),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
@@ -569,17 +534,6 @@ func (p *sliceSegmentReader) ReadGlobalEntity(ctx context.Context) (*activity.En
|
|||||||
return &activity.EntityActivityLog{Clients: record}, nil
|
return &activity.EntityActivityLog{Clients: record}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadLocalEntity here is a dummy implementation.
|
|
||||||
// Segment reader is never used when writing using the ClientCountUtil library
|
|
||||||
func (p *sliceSegmentReader) ReadLocalEntity(ctx context.Context) (*activity.EntityActivityLog, error) {
|
|
||||||
if p.i == len(p.records) {
|
|
||||||
return nil, io.EOF
|
|
||||||
}
|
|
||||||
record := p.records[p.i]
|
|
||||||
p.i++
|
|
||||||
return &activity.EntityActivityLog{Clients: record}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) {
|
func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) {
|
||||||
return nil, io.EOF
|
return nil, io.EOF
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user