Revert "adding local clients to local fragments ce changes (#28856)" (#29278)

This reverts commit 1e82fe9980.
This commit is contained in:
divyaac
2025-01-03 10:09:40 -08:00
committed by GitHub
parent 9a830736c8
commit c3cb730bae
3 changed files with 55 additions and 463 deletions

View File

@@ -148,11 +148,7 @@ type ActivityLog struct {
// standbyFragmentsReceived. // standbyFragmentsReceived.
fragmentLock sync.RWMutex fragmentLock sync.RWMutex
// localFragmentLock protects partialMonthLocalClientTracker, localFragment, // globalFragmentLock protects enable secondaryGlobalClientFragments, currentGlobalFragment
// standbyLocalFragmentsReceived.
localFragmentLock sync.RWMutex
// globalFragmentLock protects enable secondaryGlobalClientFragments, standbyGlobalFragmentsReceived, currentGlobalFragment
// and globalPartialMonthClientTracker // and globalPartialMonthClientTracker
globalFragmentLock sync.RWMutex globalFragmentLock sync.RWMutex
@@ -184,9 +180,6 @@ type ActivityLog struct {
// so it's appropriate to start the timer. // so it's appropriate to start the timer.
newFragmentCh chan struct{} newFragmentCh chan struct{}
// current local log fragment (may be nil)
localFragment *activity.LogFragment
// Channel to signal a new global fragment has been created // Channel to signal a new global fragment has been created
// so it's appropriate to start the timer. Once the timer finishes // so it's appropriate to start the timer. Once the timer finishes
// the secondary will send currentGlobalFragment to the primary // the secondary will send currentGlobalFragment to the primary
@@ -204,12 +197,6 @@ type ActivityLog struct {
// Fragments received from performance standbys // Fragments received from performance standbys
standbyFragmentsReceived []*activity.LogFragment standbyFragmentsReceived []*activity.LogFragment
// Local fragments received from performance standbys
standbyLocalFragmentsReceived []*activity.LogFragment
// Global fragments received from performance standbys
standbyGlobalFragmentsReceived []*activity.LogFragment
// Fragments of global clients received from performance secondaries // Fragments of global clients received from performance secondaries
secondaryGlobalClientFragments []*activity.LogFragment secondaryGlobalClientFragments []*activity.LogFragment
@@ -232,9 +219,6 @@ type ActivityLog struct {
// partialMonthClientTracker tracks active clients this month. Protected by fragmentLock. // partialMonthClientTracker tracks active clients this month. Protected by fragmentLock.
partialMonthClientTracker map[string]*activity.EntityRecord partialMonthClientTracker map[string]*activity.EntityRecord
// partialMonthLocalClientTracker tracks active local clients this month. Protected by localFragmentLock.
partialMonthLocalClientTracker map[string]*activity.EntityRecord
// globalPartialMonthClientTracker tracks active clients this month. Protected by globalFragmentLock. // globalPartialMonthClientTracker tracks active clients this month. Protected by globalFragmentLock.
globalPartialMonthClientTracker map[string]*activity.EntityRecord globalPartialMonthClientTracker map[string]*activity.EntityRecord
@@ -274,10 +258,6 @@ type ActivityLogCoreConfig struct {
// GlobalFragmentSendInterval sets the interval to send global data from the secondary to the primary // GlobalFragmentSendInterval sets the interval to send global data from the secondary to the primary
// This is only for testing purposes // This is only for testing purposes
GlobalFragmentSendInterval time.Duration GlobalFragmentSendInterval time.Duration
// PerfStandbyFragmentSendInterval sets the interval to send fragment data from the perf standby to the active
// This is only for testing purposes
PerfStandbyFragmentSendInterval time.Duration
} }
// ActivityLogExportRecord is the output structure for activity export // ActivityLogExportRecord is the output structure for activity export
@@ -355,11 +335,10 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
metrics: metrics, metrics: metrics,
nodeID: hostname, nodeID: hostname,
newFragmentCh: make(chan struct{}, 1), newFragmentCh: make(chan struct{}, 1),
newGlobalClientFragmentCh: make(chan struct{}, 1),
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
doneCh: make(chan struct{}, 1), doneCh: make(chan struct{}, 1),
partialMonthClientTracker: make(map[string]*activity.EntityRecord), partialMonthClientTracker: make(map[string]*activity.EntityRecord),
partialMonthLocalClientTracker: make(map[string]*activity.EntityRecord),
newGlobalClientFragmentCh: make(chan struct{}, 1),
globalPartialMonthClientTracker: make(map[string]*activity.EntityRecord), globalPartialMonthClientTracker: make(map[string]*activity.EntityRecord),
clock: clock, clock: clock,
currentSegment: segmentInfo{ currentSegment: segmentInfo{
@@ -376,8 +355,6 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
clientSequenceNumber: 0, clientSequenceNumber: 0,
}, },
standbyFragmentsReceived: make([]*activity.LogFragment, 0), standbyFragmentsReceived: make([]*activity.LogFragment, 0),
standbyLocalFragmentsReceived: make([]*activity.LogFragment, 0),
standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0),
secondaryGlobalClientFragments: make([]*activity.LogFragment, 0), secondaryGlobalClientFragments: make([]*activity.LogFragment, 0),
inprocessExport: atomic.NewBool(false), inprocessExport: atomic.NewBool(false),
precomputedQueryWritten: make(chan struct{}), precomputedQueryWritten: make(chan struct{}),
@@ -423,10 +400,11 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"}, defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"},
a.clock.Now(), []metricsutil.Label{}) a.clock.Now(), []metricsutil.Label{})
// Swap out the pending regular fragments // Swap out the pending fragments
a.fragmentLock.Lock() a.fragmentLock.Lock()
currentFragment := a.fragment localFragment := a.fragment
a.fragment = nil a.fragment = nil
standbys := a.standbyFragmentsReceived standbys := a.standbyFragmentsReceived
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
@@ -465,38 +443,14 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
return nil return nil
} }
// Measure the current regular fragment // Measure the current fragment
if currentFragment != nil {
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"},
float32(len(currentFragment.Clients)),
[]metricsutil.Label{
{"type", "entity"},
})
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"},
float32(len(currentFragment.NonEntityTokens)),
[]metricsutil.Label{
{"type", "direct_token"},
})
}
// Swap out the pending local fragments
a.localFragmentLock.Lock()
localFragment := a.localFragment
a.localFragment = nil
// standbyLocalFragments := a.standbyLocalFragmentsReceived
// a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0)
a.localFragmentLock.Unlock()
// Measure the current local fragment
if localFragment != nil { if localFragment != nil {
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "local_fragment_size"}, a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"},
float32(len(localFragment.Clients)), float32(len(localFragment.Clients)),
[]metricsutil.Label{ []metricsutil.Label{
{"type", "entity"}, {"type", "entity"},
}) })
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "local_fragment_size"}, a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"},
float32(len(localFragment.NonEntityTokens)), float32(len(localFragment.NonEntityTokens)),
[]metricsutil.Label{ []metricsutil.Label{
{"type", "direct_token"}, {"type", "direct_token"},
@@ -506,7 +460,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
// Collect new entities and new tokens. // Collect new entities and new tokens.
saveChanges := false saveChanges := false
newEntities := make(map[string]*activity.EntityRecord) newEntities := make(map[string]*activity.EntityRecord)
for _, f := range append(standbys, currentFragment) { for _, f := range append(standbys, localFragment) {
if f == nil { if f == nil {
continue continue
} }
@@ -516,7 +470,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
for _, e := range f.Clients { for _, e := range f.Clients {
// We could sort by timestamp to see which is first. // We could sort by timestamp to see which is first.
// We'll ignore that; the order of the append above means // We'll ignore that; the order of the append above means
// that we choose entries in currentFragment over those // that we choose entries in localFragment over those
// from standby nodes. // from standby nodes.
newEntities[e.ClientID] = e newEntities[e.ClientID] = e
} }
@@ -1595,12 +1549,6 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
endOfMonth.Stop() endOfMonth.Stop()
} }
sendInterval := activityFragmentSendInterval
// This changes the interval to a duration that was set for testing purposes
if a.configOverrides.PerfStandbyFragmentSendInterval.Microseconds() > 0 {
sendInterval = a.configOverrides.PerfStandbyFragmentSendInterval
}
sendFunc := func() { sendFunc := func() {
ctx, cancel := context.WithTimeout(ctx, activityFragmentSendTimeout) ctx, cancel := context.WithTimeout(ctx, activityFragmentSendTimeout)
defer cancel() defer cancel()
@@ -1628,7 +1576,7 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
fragmentWaiting = true fragmentWaiting = true
if !a.configOverrides.DisableTimers { if !a.configOverrides.DisableTimers {
a.logger.Trace("reset fragment timer") a.logger.Trace("reset fragment timer")
timer.Reset(sendInterval) timer.Reset(activityFragmentSendInterval)
} }
} }
case <-timer.C: case <-timer.C:
@@ -1665,11 +1613,6 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
// clear local active entity set
a.localFragmentLock.Lock()
a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord)
a.localFragmentLock.Unlock()
// Set timer for next month. // Set timer for next month.
// The current segment *probably* hasn't been set yet (via invalidation), // The current segment *probably* hasn't been set yet (via invalidation),
// so don't rely on it. // so don't rely on it.
@@ -1841,28 +1784,17 @@ func (c *Core) ResetActivityLog() []*activity.LogFragment {
allFragments := make([]*activity.LogFragment, 1) allFragments := make([]*activity.LogFragment, 1)
a.fragmentLock.Lock() a.fragmentLock.Lock()
allFragments[0] = a.fragment allFragments[0] = a.fragment
a.fragment = nil a.fragment = nil
allFragments = append(allFragments, a.standbyFragmentsReceived...) allFragments = append(allFragments, a.standbyFragmentsReceived...)
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0) a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0)
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
// local fragments
a.localFragmentLock.Lock()
allFragments = append(allFragments, a.localFragment)
a.localFragment = nil
allFragments = append(allFragments, a.standbyLocalFragmentsReceived...)
a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0)
a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord)
a.localFragmentLock.Unlock()
// global fragments
a.globalFragmentLock.Lock() a.globalFragmentLock.Lock()
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0)
a.globalFragmentLock.Unlock() a.globalFragmentLock.Unlock()
return allFragments return allFragments
} }
@@ -1901,16 +1833,11 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string,
a.fragmentLock.RLock() a.fragmentLock.RLock()
if a.enabled { if a.enabled {
_, presentInRegularClientMap := a.partialMonthClientTracker[clientID] _, present = a.partialMonthClientTracker[clientID]
_, presentInLocalClientmap := a.partialMonthLocalClientTracker[clientID]
if presentInRegularClientMap || presentInLocalClientmap {
present = true
}
} else { } else {
present = true present = true
} }
a.fragmentLock.RUnlock() a.fragmentLock.RUnlock()
if present { if present {
return return
} }
@@ -1919,24 +1846,17 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string,
a.fragmentLock.Lock() a.fragmentLock.Lock()
defer a.fragmentLock.Unlock() defer a.fragmentLock.Unlock()
a.localFragmentLock.Lock()
defer a.localFragmentLock.Unlock()
a.globalFragmentLock.Lock() a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock() defer a.globalFragmentLock.Unlock()
// Re-check entity ID after re-acquiring lock // Re-check entity ID after re-acquiring lock
_, presentInRegularClientMap := a.partialMonthClientTracker[clientID] _, present = a.partialMonthClientTracker[clientID]
_, presentInLocalClientmap := a.partialMonthLocalClientTracker[clientID]
if presentInRegularClientMap || presentInLocalClientmap {
present = true
}
if present { if present {
return return
} }
// create fragments if doesn't already exist
a.createCurrentFragment() a.createCurrentFragment()
a.createCurrentGlobalFragment()
clientRecord := &activity.EntityRecord{ clientRecord := &activity.EntityRecord{
ClientID: clientID, ClientID: clientID,
@@ -1954,22 +1874,18 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string,
clientRecord.NonEntity = true clientRecord.NonEntity = true
} }
// add the clients to the regular fragment
a.fragment.Clients = append(a.fragment.Clients, clientRecord) a.fragment.Clients = append(a.fragment.Clients, clientRecord)
a.partialMonthClientTracker[clientRecord.ClientID] = clientRecord
if local, _ := a.isClientLocal(clientRecord); local { // Check if the client is local
// If the client is local then add the client to the current local fragment if local, _ := a.isClientLocal(clientRecord); !local {
a.localFragment.Clients = append(a.localFragment.Clients, clientRecord) // If the client is not local and has not already been seen, then add the client
a.partialMonthLocalClientTracker[clientRecord.ClientID] = clientRecord // to the current global fragment
} else {
if _, ok := a.globalPartialMonthClientTracker[clientRecord.ClientID]; !ok { if _, ok := a.globalPartialMonthClientTracker[clientRecord.ClientID]; !ok {
// If the client is not local and has not already been seen, then add the client
// to the current global fragment
a.currentGlobalFragment.Clients = append(a.currentGlobalFragment.Clients, clientRecord)
a.globalPartialMonthClientTracker[clientRecord.ClientID] = clientRecord a.globalPartialMonthClientTracker[clientRecord.ClientID] = clientRecord
a.currentGlobalFragment.Clients = append(a.currentGlobalFragment.Clients, clientRecord)
} }
} }
a.partialMonthClientTracker[clientRecord.ClientID] = clientRecord
} }
// isClientLocal checks whether the given client is on a local mount. // isClientLocal checks whether the given client is on a local mount.
@@ -1992,25 +1908,25 @@ func (a *ActivityLog) isClientLocal(client *activity.EntityRecord) (bool, error)
return false, nil return false, nil
} }
// Create the fragments (regular fragment, local fragment and global fragment) if it doesn't already exist. // Create the current fragment if it doesn't already exist.
// Must be called with the fragmentLock, localFragmentLock and globalFragmentLock held. // Must be called with the lock held.
func (a *ActivityLog) createCurrentFragment() { func (a *ActivityLog) createCurrentFragment() {
if a.fragment == nil { if a.fragment == nil {
// create regular fragment
a.fragment = &activity.LogFragment{ a.fragment = &activity.LogFragment{
OriginatingNode: a.nodeID, OriginatingNode: a.nodeID,
Clients: make([]*activity.EntityRecord, 0, 120), Clients: make([]*activity.EntityRecord, 0, 120),
NonEntityTokens: make(map[string]uint64), NonEntityTokens: make(map[string]uint64),
} }
// Signal that a new segment is available, start
// the timer to send it.
a.newFragmentCh <- struct{}{}
}
}
// create local fragment // Create the current fragment to track global clients seen
a.localFragment = &activity.LogFragment{ // on cluster. Must be called with the globalFragmentLock held
OriginatingNode: a.nodeID, func (a *ActivityLog) createCurrentGlobalFragment() {
Clients: make([]*activity.EntityRecord, 0, 120), if a.currentGlobalFragment == nil {
NonEntityTokens: make(map[string]uint64),
}
// create global fragment
a.currentGlobalFragment = &activity.LogFragment{ a.currentGlobalFragment = &activity.LogFragment{
OriginatingCluster: a.core.ClusterID(), OriginatingCluster: a.core.ClusterID(),
Clients: make([]*activity.EntityRecord, 0), Clients: make([]*activity.EntityRecord, 0),
@@ -2021,10 +1937,6 @@ func (a *ActivityLog) createCurrentFragment() {
// the timer to send it // the timer to send it
a.newGlobalClientFragmentCh <- struct{}{} a.newGlobalClientFragmentCh <- struct{}{}
} }
// Signal that a new segment is available, start
// the timer to send it.
a.newFragmentCh <- struct{}{}
} }
} }
@@ -2048,61 +1960,33 @@ func (a *ActivityLog) receivedGlobalClientFragments(fragment *activity.LogFragme
func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) { func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) {
a.logger.Trace("received fragment from standby", "node", fragment.OriginatingNode) a.logger.Trace("received fragment from standby", "node", fragment.OriginatingNode)
isLocalFragment := false a.fragmentLock.Lock()
defer a.fragmentLock.Unlock()
if !a.enabled { if !a.enabled {
return return
} }
a.fragmentLock.Lock() a.globalFragmentLock.Lock()
defer a.fragmentLock.Unlock() defer a.globalFragmentLock.Unlock()
a.createCurrentGlobalFragment()
// Check if the received fragment from standby is a local fragment.
// A fragment can have all local clients or all non-local clients except for regular fragment (which has both currently but will be modified to only hold non-local clients later).
// Check the first client to identify the type of fragment.
if len(fragment.Clients) > 0 {
client := fragment.Clients[0]
if local, _ := a.isClientLocal(client); local {
isLocalFragment = true
a.localFragmentLock.Lock()
defer a.localFragmentLock.Unlock()
} else {
a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock()
}
}
for _, e := range fragment.Clients { for _, e := range fragment.Clients {
a.partialMonthClientTracker[e.ClientID] = e a.partialMonthClientTracker[e.ClientID] = e
if isLocalFragment {
a.partialMonthLocalClientTracker[e.ClientID] = e // If the client is global, then add to global maps and keep in a global fragment
} else { if local, _ := a.isClientLocal(e); !local {
a.globalPartialMonthClientTracker[e.ClientID] = e a.globalPartialMonthClientTracker[e.ClientID] = e
a.currentGlobalFragment.Clients = append(a.currentGlobalFragment.Clients, e)
} }
} }
a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment) a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment)
if isLocalFragment {
a.standbyLocalFragmentsReceived = append(a.standbyLocalFragmentsReceived, fragment)
} else {
a.standbyGlobalFragmentsReceived = append(a.standbyGlobalFragmentsReceived, fragment)
}
// TODO: check if current segment is full and should be written // TODO: check if current segment is full and should be written
} }
// returns the active local and global clients for the current month
func (a *ActivityLog) GetAllPartialMonthClients() (map[string]*activity.EntityRecord, map[string]*activity.EntityRecord) {
a.localFragmentLock.Lock()
defer a.localFragmentLock.Unlock()
a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock()
return a.partialMonthLocalClientTracker, a.globalPartialMonthClientTracker
}
type ResponseCounts struct { type ResponseCounts struct {
EntityClients int `json:"entity_clients" mapstructure:"entity_clients"` EntityClients int `json:"entity_clients" mapstructure:"entity_clients"`
NonEntityClients int `json:"non_entity_clients" mapstructure:"non_entity_clients"` NonEntityClients int `json:"non_entity_clients" mapstructure:"non_entity_clients"`

View File

@@ -23,7 +23,6 @@ import (
"github.com/go-test/deep" "github.com/go-test/deep"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/builtin/credential/userpass"
"github.com/hashicorp/vault/helper/constants" "github.com/hashicorp/vault/helper/constants"
"github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/timeutil" "github.com/hashicorp/vault/helper/timeutil"
@@ -35,16 +34,7 @@ import (
// TestActivityLog_Creation calls AddEntityToFragment and verifies that it appears correctly in a.fragment. // TestActivityLog_Creation calls AddEntityToFragment and verifies that it appears correctly in a.fragment.
func TestActivityLog_Creation(t *testing.T) { func TestActivityLog_Creation(t *testing.T) {
storage := &logical.InmemStorage{} core, _, _ := TestCoreUnsealed(t)
coreConfig := &CoreConfig{
CredentialBackends: map[string]logical.Factory{
"userpass": userpass.Factory,
},
Physical: storage.Underlying(),
}
cluster := NewTestCluster(t, coreConfig, nil)
core := cluster.Cores[0].Core
a := core.activityLog a := core.activityLog
a.SetEnable(true) a.SetEnable(true)
@@ -59,10 +49,6 @@ func TestActivityLog_Creation(t *testing.T) {
t.Fatal("activity log already has fragment") t.Fatal("activity log already has fragment")
} }
if a.localFragment != nil {
t.Fatal("activity log already has a local fragment")
}
const entity_id = "entity_id_75432" const entity_id = "entity_id_75432"
const namespace_id = "ns123" const namespace_id = "ns123"
ts := time.Now() ts := time.Now()
@@ -132,49 +118,6 @@ func TestActivityLog_Creation(t *testing.T) {
if actual != 1 { if actual != 1 {
t.Errorf("mismatched number of tokens, %v vs %v", actual, 1) t.Errorf("mismatched number of tokens, %v vs %v", actual, 1)
} }
// test local fragment
localMe := &MountEntry{
Table: credentialTableType,
Path: "userpass-local/",
Type: "userpass",
Local: true,
Accessor: "local_mount_accessor",
}
err := core.enableCredential(namespace.RootContext(nil), localMe)
require.NoError(t, err)
const local_entity_id = "entity_id_75434"
local_ts := time.Now()
a.AddClientToFragment(local_entity_id, "root", local_ts.Unix(), false, "local_mount_accessor")
if a.localFragment.OriginatingNode != a.nodeID {
t.Errorf("mismatched node ID, %q vs %q", a.localFragment.OriginatingNode, a.nodeID)
}
if a.localFragment.Clients == nil {
t.Fatal("no local fragment entity slice")
}
if a.localFragment.NonEntityTokens == nil {
t.Fatal("no local fragment token map")
}
if len(a.localFragment.Clients) != 1 {
t.Fatalf("wrong number of entities %v", len(a.localFragment.Clients))
}
er = a.localFragment.Clients[0]
if er.ClientID != local_entity_id {
t.Errorf("mimatched entity ID, %q vs %q", er.ClientID, local_entity_id)
}
if er.NamespaceID != "root" {
t.Errorf("mimatched namespace ID, %q vs %q", er.NamespaceID, "root")
}
if er.Timestamp != ts.Unix() {
t.Errorf("mimatched timestamp, %v vs %v", er.Timestamp, ts.Unix())
}
} }
// TestActivityLog_Creation_WrappingTokens calls HandleTokenUsage for two wrapping tokens, and verifies that this // TestActivityLog_Creation_WrappingTokens calls HandleTokenUsage for two wrapping tokens, and verifies that this
@@ -196,13 +139,6 @@ func TestActivityLog_Creation_WrappingTokens(t *testing.T) {
t.Fatal("activity log already has fragment") t.Fatal("activity log already has fragment")
} }
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
a.localFragmentLock.Lock()
if a.localFragment != nil {
t.Fatal("activity log already has local fragment")
}
a.localFragmentLock.Unlock()
const namespace_id = "ns123" const namespace_id = "ns123"
te := &logical.TokenEntry{ te := &logical.TokenEntry{
@@ -413,10 +349,6 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) {
t.Errorf("fragment was not reset after write to storage") t.Errorf("fragment was not reset after write to storage")
} }
if a.localFragment != nil {
t.Errorf("local fragment was not reset after write to storage")
}
out := &activity.TokenCount{} out := &activity.TokenCount{}
protoSegment := readSegmentFromStorage(t, core, path) protoSegment := readSegmentFromStorage(t, core, path)
err = proto.Unmarshal(protoSegment.Value, out) err = proto.Unmarshal(protoSegment.Value, out)
@@ -449,10 +381,6 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) {
t.Errorf("fragment was not reset after write to storage") t.Errorf("fragment was not reset after write to storage")
} }
if a.localFragment != nil {
t.Errorf("local fragment was not reset after write to storage")
}
protoSegment = readSegmentFromStorage(t, core, path) protoSegment = readSegmentFromStorage(t, core, path)
out = &activity.TokenCount{} out = &activity.TokenCount{}
err = proto.Unmarshal(protoSegment.Value, out) err = proto.Unmarshal(protoSegment.Value, out)
@@ -522,10 +450,6 @@ func TestActivityLog_SaveTokensToStorageDoesNotUpdateTokenCount(t *testing.T) {
t.Errorf("fragment was not reset after write to storage") t.Errorf("fragment was not reset after write to storage")
} }
if a.localFragment != nil {
t.Errorf("local fragment was not reset after write to storage")
}
// Assert that no tokens have been written to the fragment // Assert that no tokens have been written to the fragment
readSegmentFromStorageNil(t, core, tokenPath) readSegmentFromStorageNil(t, core, tokenPath)
@@ -589,9 +513,6 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
t.Errorf("fragment was not reset after write to storage") t.Errorf("fragment was not reset after write to storage")
} }
if a.localFragment != nil {
t.Errorf("local fragment was not reset after write to storage")
}
protoSegment := readSegmentFromStorage(t, core, path) protoSegment := readSegmentFromStorage(t, core, path)
out := &activity.EntityActivityLog{} out := &activity.EntityActivityLog{}
err = proto.Unmarshal(protoSegment.Value, out) err = proto.Unmarshal(protoSegment.Value, out)
@@ -680,8 +601,8 @@ func TestModifyResponseMonthsNilAppend(t *testing.T) {
} }
// TestActivityLog_ReceivedFragment calls receivedFragment with a fragment and verifies it gets added to // TestActivityLog_ReceivedFragment calls receivedFragment with a fragment and verifies it gets added to
// standbyFragmentsReceived and standbyGlobalFragmentsReceived. Send the same fragment again and then verify that it doesn't change the entity map but does // standbyFragmentsReceived. Send the same fragment again and then verify that it doesn't change the entity map but does
// get added to standbyFragmentsReceived and standbyGlobalFragmentsReceived. // get added to standbyFragmentsReceived.
func TestActivityLog_ReceivedFragment(t *testing.T) { func TestActivityLog_ReceivedFragment(t *testing.T) {
core, _, _ := TestCoreUnsealed(t) core, _, _ := TestCoreUnsealed(t)
a := core.activityLog a := core.activityLog
@@ -723,10 +644,6 @@ func TestActivityLog_ReceivedFragment(t *testing.T) {
t.Fatalf("fragment count is %v, expected 1", len(a.standbyFragmentsReceived)) t.Fatalf("fragment count is %v, expected 1", len(a.standbyFragmentsReceived))
} }
if len(a.standbyGlobalFragmentsReceived) != 1 {
t.Fatalf("fragment count is %v, expected 1", len(a.standbyGlobalFragmentsReceived))
}
// Send a duplicate, should be stored but not change entity map // Send a duplicate, should be stored but not change entity map
a.receivedFragment(fragment) a.receivedFragment(fragment)
@@ -735,9 +652,6 @@ func TestActivityLog_ReceivedFragment(t *testing.T) {
if len(a.standbyFragmentsReceived) != 2 { if len(a.standbyFragmentsReceived) != 2 {
t.Fatalf("fragment count is %v, expected 2", len(a.standbyFragmentsReceived)) t.Fatalf("fragment count is %v, expected 2", len(a.standbyFragmentsReceived))
} }
if len(a.standbyGlobalFragmentsReceived) != 2 {
t.Fatalf("fragment count is %v, expected 2", len(a.standbyGlobalFragmentsReceived))
}
} }
// TestActivityLog_availableLogsEmptyDirectory verifies that availableLogs returns an empty slice when the log directory // TestActivityLog_availableLogsEmptyDirectory verifies that availableLogs returns an empty slice when the log directory
@@ -1374,16 +1288,10 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
a.l.Lock() a.l.Lock()
defer a.l.Unlock() defer a.l.Unlock()
a.fragmentLock.Lock() a.fragmentLock.Lock()
defer a.fragmentLock.Unlock() defer a.fragmentLock.Unlock()
a.localFragmentLock.Lock()
defer a.localFragmentLock.Unlock()
a.globalFragmentLock.Lock() a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock() defer a.globalFragmentLock.Unlock()
a.currentSegment = segmentInfo{ a.currentSegment = segmentInfo{
startTimestamp: time.Time{}.Unix(), startTimestamp: time.Time{}.Unix(),
currentClients: &activity.EntityActivityLog{ currentClients: &activity.EntityActivityLog{
@@ -1394,7 +1302,6 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
} }
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord)
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
} }
@@ -1591,12 +1498,9 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
if tc.refresh { if tc.refresh {
a.l.Lock() a.l.Lock()
a.fragmentLock.Lock() a.fragmentLock.Lock()
a.localFragmentLock.Lock()
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord)
a.currentSegment.startTimestamp = tc.time a.currentSegment.startTimestamp = tc.time
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
a.localFragmentLock.Unlock()
a.l.Unlock() a.l.Unlock()
} }
@@ -4841,45 +4745,15 @@ func TestActivityLog_HandleEndOfMonth(t *testing.T) {
// clients and verifies that they are added correctly to the tracking data // clients and verifies that they are added correctly to the tracking data
// structures // structures
func TestAddActivityToFragment(t *testing.T) { func TestAddActivityToFragment(t *testing.T) {
storage := &logical.InmemStorage{} core, _, _ := TestCoreUnsealed(t)
coreConfig := &CoreConfig{
CredentialBackends: map[string]logical.Factory{
"userpass": userpass.Factory,
},
Physical: storage.Underlying(),
}
cluster := NewTestCluster(t, coreConfig, nil)
core := cluster.Cores[0].Core
a := core.activityLog a := core.activityLog
a.SetEnable(true) a.SetEnable(true)
require.Nil(t, a.fragment)
require.Nil(t, a.localFragment)
require.Nil(t, a.currentGlobalFragment)
mount := "mount" mount := "mount"
localMount := "localMount"
ns := "root" ns := "root"
id := "id1" id := "id1"
// keeps track of the number of clients added to localFragment
localCount := 0
// add a client to regular fragment
a.AddActivityToFragment(id, ns, 0, entityActivityType, mount) a.AddActivityToFragment(id, ns, 0, entityActivityType, mount)
// create a local mount accessor for local clients
localMe := &MountEntry{
Table: credentialTableType,
Path: "userpass-local/",
Type: "userpass",
Local: true,
Accessor: localMount,
}
err := core.enableCredential(namespace.RootContext(nil), localMe)
require.NoError(t, err)
testCases := []struct { testCases := []struct {
name string name string
id string id string
@@ -4887,7 +4761,6 @@ func TestAddActivityToFragment(t *testing.T) {
isAdded bool isAdded bool
expectedID string expectedID string
isNonEntity bool isNonEntity bool
isLocal bool
}{ }{
{ {
name: "duplicate", name: "duplicate",
@@ -4895,7 +4768,6 @@ func TestAddActivityToFragment(t *testing.T) {
activityType: entityActivityType, activityType: entityActivityType,
isAdded: false, isAdded: false,
expectedID: id, expectedID: id,
isLocal: false,
}, },
{ {
name: "new entity", name: "new entity",
@@ -4903,7 +4775,6 @@ func TestAddActivityToFragment(t *testing.T) {
activityType: entityActivityType, activityType: entityActivityType,
isAdded: true, isAdded: true,
expectedID: "new-id", expectedID: "new-id",
isLocal: false,
}, },
{ {
name: "new nonentity", name: "new nonentity",
@@ -4912,7 +4783,6 @@ func TestAddActivityToFragment(t *testing.T) {
isAdded: true, isAdded: true,
expectedID: "new-nonentity", expectedID: "new-nonentity",
isNonEntity: true, isNonEntity: true,
isLocal: true,
}, },
{ {
name: "new acme", name: "new acme",
@@ -4921,7 +4791,6 @@ func TestAddActivityToFragment(t *testing.T) {
isAdded: true, isAdded: true,
expectedID: "pki-acme.new-acme", expectedID: "pki-acme.new-acme",
isNonEntity: true, isNonEntity: true,
isLocal: false,
}, },
{ {
name: "new secret sync", name: "new secret sync",
@@ -4930,22 +4799,11 @@ func TestAddActivityToFragment(t *testing.T) {
isAdded: true, isAdded: true,
expectedID: "new-secret-sync", expectedID: "new-secret-sync",
isNonEntity: true, isNonEntity: true,
isLocal: false,
},
{
name: "new local entity",
id: "new-local-id",
activityType: entityActivityType,
isAdded: true,
expectedID: "new-local-id",
isNonEntity: false,
isLocal: true,
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var mountAccessor string
a.fragmentLock.RLock() a.fragmentLock.RLock()
numClientsBefore := len(a.fragment.Clients) numClientsBefore := len(a.fragment.Clients)
a.fragmentLock.RUnlock() a.fragmentLock.RUnlock()
@@ -4954,25 +4812,7 @@ func TestAddActivityToFragment(t *testing.T) {
globalClientsBefore := len(a.currentGlobalFragment.Clients) globalClientsBefore := len(a.currentGlobalFragment.Clients)
a.globalFragmentLock.RUnlock() a.globalFragmentLock.RUnlock()
numLocalClientsBefore := 0 a.AddActivityToFragment(tc.id, ns, 0, tc.activityType, mount)
// add client to the fragment
if tc.isLocal {
// data already present in local fragment, get client count before adding activity to fragment
a.localFragmentLock.RLock()
numLocalClientsBefore = len(a.localFragment.Clients)
a.localFragmentLock.RUnlock()
mountAccessor = localMount
a.AddActivityToFragment(tc.id, ns, 0, tc.activityType, localMount)
require.NotNil(t, a.localFragment)
localCount++
} else {
mountAccessor = mount
a.AddActivityToFragment(tc.id, ns, 0, tc.activityType, mount)
}
a.fragmentLock.RLock() a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock() defer a.fragmentLock.RUnlock()
numClientsAfter := len(a.fragment.Clients) numClientsAfter := len(a.fragment.Clients)
@@ -4980,36 +4820,14 @@ func TestAddActivityToFragment(t *testing.T) {
defer a.globalFragmentLock.RUnlock() defer a.globalFragmentLock.RUnlock()
globalClientsAfter := len(a.currentGlobalFragment.Clients) globalClientsAfter := len(a.currentGlobalFragment.Clients)
// if local client, verify if local fragment is updated
if tc.isLocal {
a.localFragmentLock.RLock()
defer a.localFragmentLock.RUnlock()
numLocalClientsAfter := len(a.localFragment.Clients)
switch tc.isAdded {
case true:
require.Equal(t, numLocalClientsBefore+1, numLocalClientsAfter)
default:
require.Equal(t, numLocalClientsBefore, numLocalClientsAfter)
}
} else {
// verify global clients
switch tc.isAdded {
case true:
if tc.activityType != nonEntityTokenActivityType {
require.Equal(t, globalClientsBefore+1, globalClientsAfter)
}
default:
require.Equal(t, globalClientsBefore, globalClientsAfter)
}
}
// for now local clients are added to both regular fragment and local fragment.
// this will be modified in ticket vault-31234
if tc.isAdded { if tc.isAdded {
require.Equal(t, numClientsBefore+1, numClientsAfter) require.Equal(t, numClientsBefore+1, numClientsAfter)
if tc.activityType != nonEntityTokenActivityType {
require.Equal(t, globalClientsBefore+1, globalClientsAfter)
}
} else { } else {
require.Equal(t, numClientsBefore, numClientsAfter) require.Equal(t, numClientsBefore, numClientsAfter)
require.Equal(t, globalClientsBefore, globalClientsAfter)
} }
require.Contains(t, a.partialMonthClientTracker, tc.expectedID) require.Contains(t, a.partialMonthClientTracker, tc.expectedID)
@@ -5018,21 +4836,10 @@ func TestAddActivityToFragment(t *testing.T) {
NamespaceID: ns, NamespaceID: ns,
Timestamp: 0, Timestamp: 0,
NonEntity: tc.isNonEntity, NonEntity: tc.isNonEntity,
MountAccessor: mountAccessor, MountAccessor: mount,
ClientType: tc.activityType, ClientType: tc.activityType,
}, a.partialMonthClientTracker[tc.expectedID])) }, a.partialMonthClientTracker[tc.expectedID]))
if tc.activityType != nonEntityTokenActivityType {
if tc.isLocal {
require.Contains(t, a.partialMonthLocalClientTracker, tc.expectedID)
require.True(t, proto.Equal(&activity.EntityRecord{
ClientID: tc.expectedID,
NamespaceID: ns,
Timestamp: 0,
NonEntity: tc.isNonEntity,
MountAccessor: mountAccessor,
ClientType: tc.activityType,
}, a.partialMonthLocalClientTracker[tc.expectedID]))
} else {
require.Contains(t, a.globalPartialMonthClientTracker, tc.expectedID) require.Contains(t, a.globalPartialMonthClientTracker, tc.expectedID)
require.True(t, proto.Equal(&activity.EntityRecord{ require.True(t, proto.Equal(&activity.EntityRecord{
ClientID: tc.expectedID, ClientID: tc.expectedID,
@@ -5047,81 +4854,6 @@ func TestAddActivityToFragment(t *testing.T) {
} }
} }
// TestGetAllPartialMonthClients adds activity for a local and regular clients and verifies that
// GetAllPartialMonthClients returns the right local and global clients
func TestGetAllPartialMonthClients(t *testing.T) {
storage := &logical.InmemStorage{}
coreConfig := &CoreConfig{
CredentialBackends: map[string]logical.Factory{
"userpass": userpass.Factory,
},
Physical: storage.Underlying(),
}
cluster := NewTestCluster(t, coreConfig, nil)
core := cluster.Cores[0].Core
a := core.activityLog
a.SetEnable(true)
require.Nil(t, a.fragment)
require.Nil(t, a.localFragment)
require.Nil(t, a.currentGlobalFragment)
ns := "root"
mount := "mount"
localMount := "localMount"
clientID := "id1"
localClientID := "new-local-id"
// add a client to regular fragment, this should be added to globalPartialMonthClientTracker
a.AddActivityToFragment(clientID, ns, 0, entityActivityType, mount)
require.NotNil(t, a.localFragment)
require.NotNil(t, a.fragment)
require.NotNil(t, a.currentGlobalFragment)
// create a local mount accessor
localMe := &MountEntry{
Table: credentialTableType,
Path: "userpass-local/",
Type: "userpass",
Local: true,
Accessor: localMount,
}
err := core.enableCredential(namespace.RootContext(nil), localMe)
require.NoError(t, err)
// add client to local fragment, this should be added to partialMonthLocalClientTracker
a.AddActivityToFragment(localClientID, ns, 0, entityActivityType, localMount)
require.NotNil(t, a.localFragment)
// GetAllPartialMonthClients returns the partialMonthLocalClientTracker and globalPartialMonthClientTracker
localClients, globalClients := a.GetAllPartialMonthClients()
// verify the returned localClients
require.Len(t, localClients, 1)
require.Contains(t, localClients, localClientID)
require.True(t, proto.Equal(&activity.EntityRecord{
ClientID: localClientID,
NamespaceID: ns,
Timestamp: 0,
MountAccessor: localMount,
ClientType: entityActivityType,
}, localClients[localClientID]))
// verify the returned globalClients
require.Len(t, globalClients, 1)
require.Contains(t, globalClients, clientID)
require.True(t, proto.Equal(&activity.EntityRecord{
ClientID: clientID,
NamespaceID: ns,
Timestamp: 0,
MountAccessor: mount,
ClientType: entityActivityType,
}, globalClients[clientID]))
}
// TestActivityLog_reportPrecomputedQueryMetrics creates 3 clients per type and // TestActivityLog_reportPrecomputedQueryMetrics creates 3 clients per type and
// calls reportPrecomputedQueryMetrics. The test verifies that the metric sink // calls reportPrecomputedQueryMetrics. The test verifies that the metric sink
// gets metrics reported correctly, based on the segment time matching the // gets metrics reported correctly, based on the segment time matching the

View File

@@ -57,27 +57,17 @@ func (c *Core) InjectActivityLogDataThisMonth(t *testing.T) map[string]*activity
return c.activityLog.partialMonthClientTracker return c.activityLog.partialMonthClientTracker
} }
// GetActiveClients returns the in-memory globalPartialMonthClientTracker and partialMonthLocalClientTracker from an // GetActiveClients returns the in-memory partialMonthClientTracker from an
// activity log. // activity log.
func (c *Core) GetActiveClients() map[string]*activity.EntityRecord { func (c *Core) GetActiveClients() map[string]*activity.EntityRecord {
out := make(map[string]*activity.EntityRecord) out := make(map[string]*activity.EntityRecord)
c.stateLock.RLock() c.stateLock.RLock()
c.activityLog.globalFragmentLock.RLock() c.activityLog.globalFragmentLock.RLock()
c.activityLog.localFragmentLock.RLock()
// add active global clients
for k, v := range c.activityLog.globalPartialMonthClientTracker { for k, v := range c.activityLog.globalPartialMonthClientTracker {
out[k] = v out[k] = v
} }
// add active local clients
for k, v := range c.activityLog.partialMonthLocalClientTracker {
out[k] = v
}
c.activityLog.globalFragmentLock.RUnlock() c.activityLog.globalFragmentLock.RUnlock()
c.activityLog.localFragmentLock.RUnlock()
c.stateLock.RUnlock() c.stateLock.RUnlock()
return out return out
@@ -178,9 +168,6 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart
if a.partialMonthClientTracker == nil { if a.partialMonthClientTracker == nil {
t.Errorf("expected non-nil partialMonthClientTracker") t.Errorf("expected non-nil partialMonthClientTracker")
} }
if a.partialMonthLocalClientTracker == nil {
t.Errorf("expected non-nil partialMonthLocalClientTracker")
}
if len(a.currentSegment.currentClients.Clients) > 0 { if len(a.currentSegment.currentClients.Clients) > 0 {
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentClients) t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentClients)
} }
@@ -190,9 +177,6 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart
if len(a.partialMonthClientTracker) > 0 { if len(a.partialMonthClientTracker) > 0 {
t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthClientTracker) t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthClientTracker)
} }
if len(a.partialMonthLocalClientTracker) > 0 {
t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthLocalClientTracker)
}
if verifyTimeNotZero { if verifyTimeNotZero {
if a.currentSegment.startTimestamp == 0 { if a.currentSegment.startTimestamp == 0 {
@@ -271,11 +255,3 @@ func (c *Core) GetActiveGlobalFragment() *activity.LogFragment {
func (c *Core) GetSecondaryGlobalFragments() []*activity.LogFragment { func (c *Core) GetSecondaryGlobalFragments() []*activity.LogFragment {
return c.activityLog.secondaryGlobalClientFragments return c.activityLog.secondaryGlobalClientFragments
} }
func (c *Core) GetActiveLocalFragment() *activity.LogFragment {
return c.activityLog.localFragment
}
func (c *Core) GetActiveFragment() *activity.LogFragment {
return c.activityLog.fragment
}