Revert "OSS Changes Patch (#28810)" (#29281)

This reverts commit bad87541ed.
This commit is contained in:
akshya96
2025-01-03 13:08:12 -08:00
committed by GitHub
parent 39365aa01f
commit a713a820b8
3 changed files with 42 additions and 345 deletions

View File

@@ -53,9 +53,8 @@ const (
ActivityLogLocalPrefix = "sys/counters/activity/local/log/" ActivityLogLocalPrefix = "sys/counters/activity/local/log/"
ActivityPrefix = "sys/counters/activity/" ActivityPrefix = "sys/counters/activity/"
// Time to wait before a perf standby sends data to the active node, or // Time to wait on perf standby before sending fragment
// before the active node of a performance secondary sends global data to the primary. activityFragmentStandbyTime = 10 * time.Minute
activityFragmentSendInterval = 10 * time.Minute
// Time between writes of segment to storage // Time between writes of segment to storage
activitySegmentInterval = 10 * time.Minute activitySegmentInterval = 10 * time.Minute
@@ -141,17 +140,13 @@ type ActivityLog struct {
// ActivityLog.l protects the configuration settings, except enable, and any modifications // ActivityLog.l protects the configuration settings, except enable, and any modifications
// to the current segment. // to the current segment.
// Acquire "l" before fragmentLock and globalFragmentLock if both must be held. // Acquire "l" before fragmentLock if both must be held.
l sync.RWMutex l sync.RWMutex
// fragmentLock protects enable, partialMonthClientTracker, fragment, // fragmentLock protects enable, partialMonthClientTracker, fragment,
// standbyFragmentsReceived. // standbyFragmentsReceived.
fragmentLock sync.RWMutex fragmentLock sync.RWMutex
// globalFragmentLock protects enable secondaryGlobalClientFragments, currentGlobalFragment
// and globalPartialMonthClientTracker
globalFragmentLock sync.RWMutex
// enabled indicates if the activity log is enabled for this cluster. // enabled indicates if the activity log is enabled for this cluster.
// This is protected by fragmentLock so we can check with only // This is protected by fragmentLock so we can check with only
// a single synchronization call. // a single synchronization call.
@@ -174,17 +169,13 @@ type ActivityLog struct {
nodeID string nodeID string
// current log fragment (may be nil) // current log fragment (may be nil)
fragment *activity.LogFragment fragment *activity.LogFragment
fragmentCreation time.Time
// Channel to signal a new fragment has been created // Channel to signal a new fragment has been created
// so it's appropriate to start the timer. // so it's appropriate to start the timer.
newFragmentCh chan struct{} newFragmentCh chan struct{}
// Channel to signal a new global fragment has been created
// so it's appropriate to start the timer. Once the timer finishes
// the secondary will send currentGlobalFragment to the primary
newGlobalClientFragmentCh chan struct{}
// Channel for sending fragment immediately // Channel for sending fragment immediately
sendCh chan struct{} sendCh chan struct{}
@@ -197,9 +188,6 @@ type ActivityLog struct {
// Fragments received from performance standbys // Fragments received from performance standbys
standbyFragmentsReceived []*activity.LogFragment standbyFragmentsReceived []*activity.LogFragment
// Fragments of global clients received from performance secondaries
secondaryGlobalClientFragments []*activity.LogFragment
// precomputed queries // precomputed queries
queryStore *activity.PrecomputedQueryStore queryStore *activity.PrecomputedQueryStore
defaultReportMonths int defaultReportMonths int
@@ -219,9 +207,6 @@ type ActivityLog struct {
// partialMonthClientTracker tracks active clients this month. Protected by fragmentLock. // partialMonthClientTracker tracks active clients this month. Protected by fragmentLock.
partialMonthClientTracker map[string]*activity.EntityRecord partialMonthClientTracker map[string]*activity.EntityRecord
// globalPartialMonthClientTracker tracks active clients this month. Protected by globalFragmentLock.
globalPartialMonthClientTracker map[string]*activity.EntityRecord
inprocessExport *atomic.Bool inprocessExport *atomic.Bool
// clock is used to support manipulating time in unit and integration tests // clock is used to support manipulating time in unit and integration tests
@@ -230,8 +215,8 @@ type ActivityLog struct {
// is written. It's used for unit testing // is written. It's used for unit testing
precomputedQueryWritten chan struct{} precomputedQueryWritten chan struct{}
// currentGlobalFragment tracks the global clients of all the clients in memory // globalClients tracks the global clients of all the clients in memory
currentGlobalFragment *activity.LogFragment globalClients *activity.LogFragment
} }
// These non-persistent configuration options allow us to disable // These non-persistent configuration options allow us to disable
@@ -254,10 +239,6 @@ type ActivityLogCoreConfig struct {
Clock timeutil.Clock Clock timeutil.Clock
DisableInvalidation bool DisableInvalidation bool
// GlobalFragmentSendInterval sets the interval to send global data from the secondary to the primary
// This is only for testing purposes
GlobalFragmentSendInterval time.Duration
} }
// ActivityLogExportRecord is the output structure for activity export // ActivityLogExportRecord is the output structure for activity export
@@ -328,19 +309,17 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
clock = timeutil.DefaultClock{} clock = timeutil.DefaultClock{}
} }
a := &ActivityLog{ a := &ActivityLog{
core: core, core: core,
configOverrides: &core.activityLogConfig, configOverrides: &core.activityLogConfig,
logger: logger, logger: logger,
view: view, view: view,
metrics: metrics, metrics: metrics,
nodeID: hostname, nodeID: hostname,
newFragmentCh: make(chan struct{}, 1), newFragmentCh: make(chan struct{}, 1),
newGlobalClientFragmentCh: make(chan struct{}, 1), sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size doneCh: make(chan struct{}, 1),
doneCh: make(chan struct{}, 1), partialMonthClientTracker: make(map[string]*activity.EntityRecord),
partialMonthClientTracker: make(map[string]*activity.EntityRecord), clock: clock,
globalPartialMonthClientTracker: make(map[string]*activity.EntityRecord),
clock: clock,
currentSegment: segmentInfo{ currentSegment: segmentInfo{
startTimestamp: 0, startTimestamp: 0,
currentClients: &activity.EntityActivityLog{ currentClients: &activity.EntityActivityLog{
@@ -354,10 +333,9 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
}, },
clientSequenceNumber: 0, clientSequenceNumber: 0,
}, },
standbyFragmentsReceived: make([]*activity.LogFragment, 0), standbyFragmentsReceived: make([]*activity.LogFragment, 0),
secondaryGlobalClientFragments: make([]*activity.LogFragment, 0), inprocessExport: atomic.NewBool(false),
inprocessExport: atomic.NewBool(false), precomputedQueryWritten: make(chan struct{}),
precomputedQueryWritten: make(chan struct{}),
} }
config, err := a.loadConfigOrDefault(core.activeContext) config, err := a.loadConfigOrDefault(core.activeContext)
@@ -404,38 +382,10 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
a.fragmentLock.Lock() a.fragmentLock.Lock()
localFragment := a.fragment localFragment := a.fragment
a.fragment = nil a.fragment = nil
standbys := a.standbyFragmentsReceived standbys := a.standbyFragmentsReceived
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
a.globalFragmentLock.Lock()
secondaryGlobalClients := a.secondaryGlobalClientFragments
a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0)
globalClients := a.currentGlobalFragment
a.currentGlobalFragment = nil
a.globalFragmentLock.Unlock()
if !a.core.IsPerfSecondary() {
if a.currentGlobalFragment != nil {
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_fragment_size"},
float32(len(globalClients.Clients)),
[]metricsutil.Label{
{"type", "client"},
})
}
var globalReceivedFragmentTotal int
for _, globalReceivedFragment := range secondaryGlobalClients {
globalReceivedFragmentTotal += len(globalReceivedFragment.Clients)
}
a.metrics.IncrCounterWithLabels([]string{"core", "activity", "global_received_fragment_size"},
float32(globalReceivedFragmentTotal),
[]metricsutil.Label{
{"type", "client"},
})
}
// If segment start time is zero, do not update or write // If segment start time is zero, do not update or write
// (even if force is true). This can happen if activityLog is // (even if force is true). This can happen if activityLog is
// disabled after a save as been triggered. // disabled after a save as been triggered.
@@ -511,6 +461,10 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
} }
} }
a.currentSegment.currentClients.Clients = segmentClients a.currentSegment.currentClients.Clients = segmentClients
if a.core.IsPerfSecondary() {
a.sendGlobalClients(ctx)
}
err := a.saveCurrentSegmentInternal(ctx, force) err := a.saveCurrentSegmentInternal(ctx, force)
if err != nil { if err != nil {
// The current fragment(s) have already been placed into the in-memory // The current fragment(s) have already been placed into the in-memory
@@ -793,19 +747,14 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
a.l.RLock() a.l.RLock()
a.fragmentLock.Lock() a.fragmentLock.Lock()
a.globalFragmentLock.Lock()
// Handle the (unlikely) case where the end of the month has been reached while background loading. // Handle the (unlikely) case where the end of the month has been reached while background loading.
// Or the feature has been disabled. // Or the feature has been disabled.
if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp { if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp {
for _, ent := range out.Clients { for _, ent := range out.Clients {
a.partialMonthClientTracker[ent.ClientID] = ent a.partialMonthClientTracker[ent.ClientID] = ent
if local, _ := a.isClientLocal(ent); !local {
a.globalPartialMonthClientTracker[ent.ClientID] = ent
}
} }
} }
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
a.globalFragmentLock.Unlock()
a.l.RUnlock() a.l.RUnlock()
return nil return nil
@@ -846,9 +795,6 @@ func (a *ActivityLog) loadCurrentClientSegment(ctx context.Context, startTime ti
for _, client := range out.Clients { for _, client := range out.Clients {
a.partialMonthClientTracker[client.ClientID] = client a.partialMonthClientTracker[client.ClientID] = client
if local, _ := a.isClientLocal(client); !local {
a.globalPartialMonthClientTracker[client.ClientID] = client
}
} }
return nil return nil
@@ -972,11 +918,8 @@ func (a *ActivityLog) resetCurrentLog() {
a.fragment = nil a.fragment = nil
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.currentGlobalFragment = nil
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0)
} }
func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) { func (a *ActivityLog) deleteLogWorker(ctx context.Context, startTimestamp int64, whenDone chan struct{}) {
@@ -1032,8 +975,6 @@ func (a *ActivityLog) refreshFromStoredLog(ctx context.Context, wg *sync.WaitGro
defer a.l.Unlock() defer a.l.Unlock()
a.fragmentLock.Lock() a.fragmentLock.Lock()
defer a.fragmentLock.Unlock() defer a.fragmentLock.Unlock()
a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock()
decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx, now) decreasingLogTimes, err := a.getMostRecentActivityLogSegment(ctx, now)
if err != nil { if err != nil {
@@ -1291,9 +1232,6 @@ func (c *Core) setupActivityLogLocked(ctx context.Context, wg *sync.WaitGroup, r
} else { } else {
if !c.activityLogConfig.DisableFragmentWorker { if !c.activityLogConfig.DisableFragmentWorker {
go manager.activeFragmentWorker(ctx) go manager.activeFragmentWorker(ctx)
if c.IsPerfSecondary() {
go manager.secondaryFragmentWorker(ctx)
}
} }
doRegeneration := !reload && !manager.hasRegeneratedACME(ctx) doRegeneration := !reload && !manager.hasRegeneratedACME(ctx)
@@ -1445,97 +1383,6 @@ func (a *ActivityLog) StartOfNextMonth() time.Time {
return timeutil.StartOfNextMonth(segmentStart) return timeutil.StartOfNextMonth(segmentStart)
} }
// secondaryFragmentWorker handles scheduling global client fragments
// to send via RPC to the primary; it runs on performance secondaries
func (a *ActivityLog) secondaryFragmentWorker(ctx context.Context) {
timer := a.clock.NewTimer(time.Duration(0))
fragmentWaiting := false
// Eat first event, so timer is stopped
<-timer.C
endOfMonth := a.clock.NewTimer(a.StartOfNextMonth().Sub(a.clock.Now()))
if a.configOverrides.DisableTimers {
endOfMonth.Stop()
}
sendInterval := activityFragmentSendInterval
// This changes the interval to a duration that was set for testing purposes
if a.configOverrides.GlobalFragmentSendInterval.Microseconds() > 0 {
sendInterval = a.configOverrides.GlobalFragmentSendInterval
}
sendFunc := func() {
ctx, cancel := context.WithTimeout(ctx, activityFragmentSendTimeout)
defer cancel()
err := a.sendGlobalClients(ctx)
if err != nil {
a.logger.Warn("activity log global fragment lost", "error", err)
}
}
for {
select {
case <-a.doneCh:
// Shutting down activity log.
if fragmentWaiting && !timer.Stop() {
<-timer.C
}
if !endOfMonth.Stop() {
<-endOfMonth.C
}
return
case <-a.newGlobalClientFragmentCh:
// New fragment created, start the timer if not
// already running
if !fragmentWaiting {
fragmentWaiting = true
if !a.configOverrides.DisableTimers {
a.logger.Trace("reset global fragment timer")
timer.Reset(sendInterval)
}
}
case <-timer.C:
a.logger.Trace("sending global fragment on timer expiration")
fragmentWaiting = false
sendFunc()
case <-a.sendCh:
a.logger.Trace("sending global fragment on request")
// It might be that we get sendCh before fragmentCh
// if a fragment is created and then immediately fills
// up to its limit. So we attempt to send even if the timer's
// not running.
if fragmentWaiting {
fragmentWaiting = false
if !timer.Stop() {
<-timer.C
}
}
sendFunc()
case <-endOfMonth.C:
a.logger.Trace("sending global fragment on end of month")
// Flush the current fragment, if any
if fragmentWaiting {
fragmentWaiting = false
if !timer.Stop() {
<-timer.C
}
}
sendFunc()
// clear active entity set
a.globalFragmentLock.Lock()
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.globalFragmentLock.Unlock()
// Set timer for next month.
// The current segment *probably* hasn't been set yet (via invalidation),
// so don't rely on it.
target := timeutil.StartOfNextMonth(a.clock.Now().UTC())
endOfMonth.Reset(target.Sub(a.clock.Now()))
}
}
}
// perfStandbyFragmentWorker handles scheduling fragments // perfStandbyFragmentWorker handles scheduling fragments
// to send via RPC; it runs on perf standby nodes only. // to send via RPC; it runs on perf standby nodes only.
func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) { func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
@@ -1576,7 +1423,7 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) {
fragmentWaiting = true fragmentWaiting = true
if !a.configOverrides.DisableTimers { if !a.configOverrides.DisableTimers {
a.logger.Trace("reset fragment timer") a.logger.Trace("reset fragment timer")
timer.Reset(activityFragmentSendInterval) timer.Reset(activityFragmentStandbyTime)
} }
} }
case <-timer.C: case <-timer.C:
@@ -1789,13 +1636,8 @@ func (c *Core) ResetActivityLog() []*activity.LogFragment {
allFragments = append(allFragments, a.standbyFragmentsReceived...) allFragments = append(allFragments, a.standbyFragmentsReceived...)
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0)
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
a.globalFragmentLock.Lock()
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.globalFragmentLock.Unlock()
return allFragments return allFragments
} }
@@ -1846,9 +1688,6 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string,
a.fragmentLock.Lock() a.fragmentLock.Lock()
defer a.fragmentLock.Unlock() defer a.fragmentLock.Unlock()
a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock()
// Re-check entity ID after re-acquiring lock // Re-check entity ID after re-acquiring lock
_, present = a.partialMonthClientTracker[clientID] _, present = a.partialMonthClientTracker[clientID]
if present { if present {
@@ -1856,7 +1695,6 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string,
} }
a.createCurrentFragment() a.createCurrentFragment()
a.createCurrentGlobalFragment()
clientRecord := &activity.EntityRecord{ clientRecord := &activity.EntityRecord{
ClientID: clientID, ClientID: clientID,
@@ -1875,39 +1713,9 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string,
} }
a.fragment.Clients = append(a.fragment.Clients, clientRecord) a.fragment.Clients = append(a.fragment.Clients, clientRecord)
// Check if the client is local
if local, _ := a.isClientLocal(clientRecord); !local {
// If the client is not local and has not already been seen, then add the client
// to the current global fragment
if _, ok := a.globalPartialMonthClientTracker[clientRecord.ClientID]; !ok {
a.globalPartialMonthClientTracker[clientRecord.ClientID] = clientRecord
a.currentGlobalFragment.Clients = append(a.currentGlobalFragment.Clients, clientRecord)
}
}
a.partialMonthClientTracker[clientRecord.ClientID] = clientRecord a.partialMonthClientTracker[clientRecord.ClientID] = clientRecord
} }
// isClientLocal checks whether the given client is on a local mount.
// In all other cases, we will assume it is a global client.
func (a *ActivityLog) isClientLocal(client *activity.EntityRecord) (bool, error) {
if !utf8.ValidString(client.ClientID) {
return false, fmt.Errorf("client ID %q is not a valid string", client.ClientID)
}
// Tokens are not replicated to performance secondary clusters
if client.GetClientType() == nonEntityTokenActivityType {
return true, nil
}
mountEntry := a.core.router.MatchingMountByAccessor(client.MountAccessor)
// If the mount entry is nil, this means the mount has been deleted. We will assume it was replicated because we do not want to
// over count clients
if mountEntry != nil && mountEntry.Local {
return true, nil
}
return false, nil
}
// Create the current fragment if it doesn't already exist. // Create the current fragment if it doesn't already exist.
// Must be called with the lock held. // Must be called with the lock held.
func (a *ActivityLog) createCurrentFragment() { func (a *ActivityLog) createCurrentFragment() {
@@ -1917,44 +1725,16 @@ func (a *ActivityLog) createCurrentFragment() {
Clients: make([]*activity.EntityRecord, 0, 120), Clients: make([]*activity.EntityRecord, 0, 120),
NonEntityTokens: make(map[string]uint64), NonEntityTokens: make(map[string]uint64),
} }
a.fragmentCreation = a.clock.Now().UTC()
// Signal that a new segment is available, start // Signal that a new segment is available, start
// the timer to send it. // the timer to send it.
a.newFragmentCh <- struct{}{} a.newFragmentCh <- struct{}{}
} }
} }
// Create the current fragment to track global clients seen
// on cluster. Must be called with the globalFragmentLock held
func (a *ActivityLog) createCurrentGlobalFragment() {
if a.currentGlobalFragment == nil {
a.currentGlobalFragment = &activity.LogFragment{
OriginatingCluster: a.core.ClusterID(),
Clients: make([]*activity.EntityRecord, 0),
}
if a.core.IsPerfSecondary() {
// Signal that a new global segment is available, start
// the timer to send it
a.newGlobalClientFragmentCh <- struct{}{}
}
}
}
func (a *ActivityLog) receivedGlobalClientFragments(fragment *activity.LogFragment) { func (a *ActivityLog) receivedGlobalClientFragments(fragment *activity.LogFragment) {
a.logger.Trace("received fragment from secondary", "cluster_id", fragment.GetOriginatingCluster()) a.logger.Trace("received fragment from secondary", "cluster_id", fragment.GetOriginatingCluster())
a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock()
if !a.enabled {
return
}
for _, e := range fragment.Clients {
a.globalPartialMonthClientTracker[e.ClientID] = e
}
a.secondaryGlobalClientFragments = append(a.secondaryGlobalClientFragments, fragment)
} }
func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) { func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) {
@@ -1967,19 +1747,8 @@ func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) {
return return
} }
a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock()
a.createCurrentGlobalFragment()
for _, e := range fragment.Clients { for _, e := range fragment.Clients {
a.partialMonthClientTracker[e.ClientID] = e a.partialMonthClientTracker[e.ClientID] = e
// If the client is global, then add to global maps and keep in a global fragment
if local, _ := a.isClientLocal(e); !local {
a.globalPartialMonthClientTracker[e.ClientID] = e
a.currentGlobalFragment.Clients = append(a.currentGlobalFragment.Clients, e)
}
} }
a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment) a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment)
@@ -2928,7 +2697,6 @@ func (a *ActivityLog) retentionWorker(ctx context.Context, currentTime time.Time
// Periodic report of number of active entities, with the current month. // Periodic report of number of active entities, with the current month.
// We don't break this down by namespace because that would require going to storage (that information // We don't break this down by namespace because that would require going to storage (that information
// is not currently stored in memory.) // is not currently stored in memory.)
// TODO: to deprecate. These metrics are not useful anymore
func (a *ActivityLog) PartialMonthMetrics(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) { func (a *ActivityLog) PartialMonthMetrics(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
a.fragmentLock.RLock() a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock() defer a.fragmentLock.RUnlock()

View File

@@ -45,7 +45,7 @@ func TestActivityLog_Creation(t *testing.T) {
if a.logger == nil || a.view == nil { if a.logger == nil || a.view == nil {
t.Fatal("activity log not initialized") t.Fatal("activity log not initialized")
} }
if a.fragment != nil || a.currentGlobalFragment != nil { if a.fragment != nil {
t.Fatal("activity log already has fragment") t.Fatal("activity log already has fragment")
} }
@@ -54,18 +54,15 @@ func TestActivityLog_Creation(t *testing.T) {
ts := time.Now() ts := time.Now()
a.AddEntityToFragment(entity_id, namespace_id, ts.Unix()) a.AddEntityToFragment(entity_id, namespace_id, ts.Unix())
if a.fragment == nil || a.currentGlobalFragment == nil { if a.fragment == nil {
t.Fatal("no fragment created") t.Fatal("no fragment created")
} }
if a.fragment.OriginatingNode != a.nodeID { if a.fragment.OriginatingNode != a.nodeID {
t.Errorf("mismatched node ID, %q vs %q", a.fragment.OriginatingNode, a.nodeID) t.Errorf("mismatched node ID, %q vs %q", a.fragment.OriginatingNode, a.nodeID)
} }
if a.currentGlobalFragment.OriginatingCluster != a.core.ClusterID() {
t.Errorf("mismatched cluster ID, %q vs %q", a.currentGlobalFragment.GetOriginatingCluster(), a.core.ClusterID())
}
if a.fragment.Clients == nil || a.currentGlobalFragment.Clients == nil { if a.fragment.Clients == nil {
t.Fatal("no fragment entity slice") t.Fatal("no fragment entity slice")
} }
@@ -76,9 +73,6 @@ func TestActivityLog_Creation(t *testing.T) {
if len(a.fragment.Clients) != 1 { if len(a.fragment.Clients) != 1 {
t.Fatalf("wrong number of entities %v", len(a.fragment.Clients)) t.Fatalf("wrong number of entities %v", len(a.fragment.Clients))
} }
if len(a.currentGlobalFragment.Clients) != 1 {
t.Fatalf("wrong number of entities %v", len(a.currentGlobalFragment.Clients))
}
er := a.fragment.Clients[0] er := a.fragment.Clients[0]
if er.ClientID != entity_id { if er.ClientID != entity_id {
@@ -91,17 +85,6 @@ func TestActivityLog_Creation(t *testing.T) {
t.Errorf("mimatched timestamp, %v vs %v", er.Timestamp, ts.Unix()) t.Errorf("mimatched timestamp, %v vs %v", er.Timestamp, ts.Unix())
} }
er = a.currentGlobalFragment.Clients[0]
if er.ClientID != entity_id {
t.Errorf("mimatched entity ID, %q vs %q", er.ClientID, entity_id)
}
if er.NamespaceID != namespace_id {
t.Errorf("mimatched namespace ID, %q vs %q", er.NamespaceID, namespace_id)
}
if er.Timestamp != ts.Unix() {
t.Errorf("mimatched timestamp, %v vs %v", er.Timestamp, ts.Unix())
}
// Reset and test the other code path // Reset and test the other code path
a.fragment = nil a.fragment = nil
a.AddTokenToFragment(namespace_id) a.AddTokenToFragment(namespace_id)
@@ -135,7 +118,7 @@ func TestActivityLog_Creation_WrappingTokens(t *testing.T) {
t.Fatal("activity log not initialized") t.Fatal("activity log not initialized")
} }
a.fragmentLock.Lock() a.fragmentLock.Lock()
if a.fragment != nil || a.currentGlobalFragment != nil { if a.fragment != nil {
t.Fatal("activity log already has fragment") t.Fatal("activity log already has fragment")
} }
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
@@ -156,7 +139,7 @@ func TestActivityLog_Creation_WrappingTokens(t *testing.T) {
} }
a.fragmentLock.Lock() a.fragmentLock.Lock()
if a.fragment != nil || a.currentGlobalFragment != nil { if a.fragment != nil {
t.Fatal("fragment created") t.Fatal("fragment created")
} }
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
@@ -176,7 +159,7 @@ func TestActivityLog_Creation_WrappingTokens(t *testing.T) {
} }
a.fragmentLock.Lock() a.fragmentLock.Lock()
if a.fragment != nil || a.currentGlobalFragment != nil { if a.fragment != nil {
t.Fatal("fragment created") t.Fatal("fragment created")
} }
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
@@ -215,16 +198,13 @@ func TestActivityLog_UniqueEntities(t *testing.T) {
a.AddEntityToFragment(id2, "root", t3.Unix()) a.AddEntityToFragment(id2, "root", t3.Unix())
a.AddEntityToFragment(id1, "root", t3.Unix()) a.AddEntityToFragment(id1, "root", t3.Unix())
if a.fragment == nil || a.currentGlobalFragment == nil { if a.fragment == nil {
t.Fatal("no current fragment") t.Fatal("no current fragment")
} }
if len(a.fragment.Clients) != 2 { if len(a.fragment.Clients) != 2 {
t.Fatalf("number of entities is %v", len(a.fragment.Clients)) t.Fatalf("number of entities is %v", len(a.fragment.Clients))
} }
if len(a.currentGlobalFragment.Clients) != 2 {
t.Fatalf("number of entities is %v", len(a.currentGlobalFragment.Clients))
}
for i, e := range a.fragment.Clients { for i, e := range a.fragment.Clients {
expectedID := id1 expectedID := id1
@@ -244,24 +224,6 @@ func TestActivityLog_UniqueEntities(t *testing.T) {
t.Errorf("%v: expected %v, got %v", i, expectedTime, e.Timestamp) t.Errorf("%v: expected %v, got %v", i, expectedTime, e.Timestamp)
} }
} }
for i, e := range a.currentGlobalFragment.Clients {
expectedID := id1
expectedTime := t1.Unix()
expectedNS := "root"
if i == 1 {
expectedID = id2
expectedTime = t2.Unix()
}
if e.ClientID != expectedID {
t.Errorf("%v: expected %q, got %q", i, expectedID, e.ClientID)
}
if e.NamespaceID != expectedNS {
t.Errorf("%v: expected %q, got %q", i, expectedNS, e.NamespaceID)
}
if e.Timestamp != expectedTime {
t.Errorf("%v: expected %v, got %v", i, expectedTime, e.Timestamp)
}
}
checkExpectedEntitiesInMap(t, a, []string{id1, id2}) checkExpectedEntitiesInMap(t, a, []string{id1, id2})
} }
@@ -345,7 +307,7 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("got error writing tokens to storage: %v", err) t.Fatalf("got error writing tokens to storage: %v", err)
} }
if a.fragment != nil || a.currentGlobalFragment != nil { if a.fragment != nil {
t.Errorf("fragment was not reset after write to storage") t.Errorf("fragment was not reset after write to storage")
} }
@@ -377,7 +339,7 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("got error writing tokens to storage: %v", err) t.Fatalf("got error writing tokens to storage: %v", err)
} }
if a.fragment != nil || a.currentGlobalFragment != nil { if a.fragment != nil {
t.Errorf("fragment was not reset after write to storage") t.Errorf("fragment was not reset after write to storage")
} }
@@ -446,7 +408,7 @@ func TestActivityLog_SaveTokensToStorageDoesNotUpdateTokenCount(t *testing.T) {
} }
// Assert that new elements have been written to the fragment // Assert that new elements have been written to the fragment
if a.fragment != nil || a.currentGlobalFragment != nil { if a.fragment != nil {
t.Errorf("fragment was not reset after write to storage") t.Errorf("fragment was not reset after write to storage")
} }
@@ -509,7 +471,7 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("got error writing entities to storage: %v", err) t.Fatalf("got error writing entities to storage: %v", err)
} }
if a.fragment != nil || a.currentGlobalFragment != nil { if a.fragment != nil {
t.Errorf("fragment was not reset after write to storage") t.Errorf("fragment was not reset after write to storage")
} }
@@ -1290,8 +1252,6 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
defer a.l.Unlock() defer a.l.Unlock()
a.fragmentLock.Lock() a.fragmentLock.Lock()
defer a.fragmentLock.Unlock() defer a.fragmentLock.Unlock()
a.globalFragmentLock.Lock()
defer a.globalFragmentLock.Unlock()
a.currentSegment = segmentInfo{ a.currentSegment = segmentInfo{
startTimestamp: time.Time{}.Unix(), startTimestamp: time.Time{}.Unix(),
currentClients: &activity.EntityActivityLog{ currentClients: &activity.EntityActivityLog{
@@ -1302,7 +1262,6 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
} }
a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.partialMonthClientTracker = make(map[string]*activity.EntityRecord)
a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord)
} }
// TestActivityLog_loadCurrentClientSegment writes entity segments and calls loadCurrentClientSegment, then verifies // TestActivityLog_loadCurrentClientSegment writes entity segments and calls loadCurrentClientSegment, then verifies
@@ -4808,26 +4767,15 @@ func TestAddActivityToFragment(t *testing.T) {
numClientsBefore := len(a.fragment.Clients) numClientsBefore := len(a.fragment.Clients)
a.fragmentLock.RUnlock() a.fragmentLock.RUnlock()
a.globalFragmentLock.RLock()
globalClientsBefore := len(a.currentGlobalFragment.Clients)
a.globalFragmentLock.RUnlock()
a.AddActivityToFragment(tc.id, ns, 0, tc.activityType, mount) a.AddActivityToFragment(tc.id, ns, 0, tc.activityType, mount)
a.fragmentLock.RLock() a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock() defer a.fragmentLock.RUnlock()
numClientsAfter := len(a.fragment.Clients) numClientsAfter := len(a.fragment.Clients)
a.globalFragmentLock.RLock()
defer a.globalFragmentLock.RUnlock()
globalClientsAfter := len(a.currentGlobalFragment.Clients)
if tc.isAdded { if tc.isAdded {
require.Equal(t, numClientsBefore+1, numClientsAfter) require.Equal(t, numClientsBefore+1, numClientsAfter)
if tc.activityType != nonEntityTokenActivityType {
require.Equal(t, globalClientsBefore+1, globalClientsAfter)
}
} else { } else {
require.Equal(t, numClientsBefore, numClientsAfter) require.Equal(t, numClientsBefore, numClientsAfter)
require.Equal(t, globalClientsBefore, globalClientsAfter)
} }
require.Contains(t, a.partialMonthClientTracker, tc.expectedID) require.Contains(t, a.partialMonthClientTracker, tc.expectedID)
@@ -4839,17 +4787,6 @@ func TestAddActivityToFragment(t *testing.T) {
MountAccessor: mount, MountAccessor: mount,
ClientType: tc.activityType, ClientType: tc.activityType,
}, a.partialMonthClientTracker[tc.expectedID])) }, a.partialMonthClientTracker[tc.expectedID]))
if tc.activityType != nonEntityTokenActivityType {
require.Contains(t, a.globalPartialMonthClientTracker, tc.expectedID)
require.True(t, proto.Equal(&activity.EntityRecord{
ClientID: tc.expectedID,
NamespaceID: ns,
Timestamp: 0,
NonEntity: tc.isNonEntity,
MountAccessor: mount,
ClientType: tc.activityType,
}, a.globalPartialMonthClientTracker[tc.expectedID]))
}
}) })
} }
} }

View File

@@ -63,11 +63,11 @@ func (c *Core) GetActiveClients() map[string]*activity.EntityRecord {
out := make(map[string]*activity.EntityRecord) out := make(map[string]*activity.EntityRecord)
c.stateLock.RLock() c.stateLock.RLock()
c.activityLog.globalFragmentLock.RLock() c.activityLog.fragmentLock.RLock()
for k, v := range c.activityLog.globalPartialMonthClientTracker { for k, v := range c.activityLog.partialMonthClientTracker {
out[k] = v out[k] = v
} }
c.activityLog.globalFragmentLock.RUnlock() c.activityLog.fragmentLock.RUnlock()
c.stateLock.RUnlock() c.stateLock.RUnlock()
return out return out
@@ -247,11 +247,3 @@ func (a *ActivityLog) GetEnabled() bool {
func (c *Core) GetActivityLog() *ActivityLog { func (c *Core) GetActivityLog() *ActivityLog {
return c.activityLog return c.activityLog
} }
func (c *Core) GetActiveGlobalFragment() *activity.LogFragment {
return c.activityLog.currentGlobalFragment
}
func (c *Core) GetSecondaryGlobalFragments() []*activity.LogFragment {
return c.activityLog.secondaryGlobalClientFragments
}