diff --git a/vault/activity_log.go b/vault/activity_log.go index fd3721adef..44f65c37b4 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -148,7 +148,11 @@ type ActivityLog struct { // standbyFragmentsReceived. fragmentLock sync.RWMutex - // globalFragmentLock protects enable secondaryGlobalClientFragments, currentGlobalFragment + // localFragmentLock protects partialMonthLocalClientTracker, localFragment, + // standbyLocalFragmentsReceived. + localFragmentLock sync.RWMutex + + // globalFragmentLock protects enable secondaryGlobalClientFragments, standbyGlobalFragmentsReceived, currentGlobalFragment // and globalPartialMonthClientTracker globalFragmentLock sync.RWMutex @@ -180,6 +184,9 @@ type ActivityLog struct { // so it's appropriate to start the timer. newFragmentCh chan struct{} + // current local log fragment (may be nil) + localFragment *activity.LogFragment + // Channel to signal a new global fragment has been created // so it's appropriate to start the timer. Once the timer finishes // the secondary will send currentGlobalFragment to the primary @@ -197,6 +204,12 @@ type ActivityLog struct { // Fragments received from performance standbys standbyFragmentsReceived []*activity.LogFragment + // Local fragments received from performance standbys + standbyLocalFragmentsReceived []*activity.LogFragment + + // Global fragments received from performance standbys + standbyGlobalFragmentsReceived []*activity.LogFragment + // Fragments of global clients received from performance secondaries secondaryGlobalClientFragments []*activity.LogFragment @@ -219,6 +232,9 @@ type ActivityLog struct { // partialMonthClientTracker tracks active clients this month. Protected by fragmentLock. partialMonthClientTracker map[string]*activity.EntityRecord + // partialMonthLocalClientTracker tracks active local clients this month. Protected by localFragmentLock. + partialMonthLocalClientTracker map[string]*activity.EntityRecord + // globalPartialMonthClientTracker tracks active clients this month. Protected by globalFragmentLock. globalPartialMonthClientTracker map[string]*activity.EntityRecord @@ -258,6 +274,10 @@ type ActivityLogCoreConfig struct { // GlobalFragmentSendInterval sets the interval to send global data from the secondary to the primary // This is only for testing purposes GlobalFragmentSendInterval time.Duration + + // PerfStandbyFragmentSendInterval sets the interval to send fragment data from the perf standby to the active + // This is only for testing purposes + PerfStandbyFragmentSendInterval time.Duration } // ActivityLogExportRecord is the output structure for activity export @@ -335,10 +355,11 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me metrics: metrics, nodeID: hostname, newFragmentCh: make(chan struct{}, 1), - newGlobalClientFragmentCh: make(chan struct{}, 1), sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size doneCh: make(chan struct{}, 1), partialMonthClientTracker: make(map[string]*activity.EntityRecord), + partialMonthLocalClientTracker: make(map[string]*activity.EntityRecord), + newGlobalClientFragmentCh: make(chan struct{}, 1), globalPartialMonthClientTracker: make(map[string]*activity.EntityRecord), clock: clock, currentSegment: segmentInfo{ @@ -355,6 +376,8 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me clientSequenceNumber: 0, }, standbyFragmentsReceived: make([]*activity.LogFragment, 0), + standbyLocalFragmentsReceived: make([]*activity.LogFragment, 0), + standbyGlobalFragmentsReceived: make([]*activity.LogFragment, 0), secondaryGlobalClientFragments: make([]*activity.LogFragment, 0), inprocessExport: atomic.NewBool(false), precomputedQueryWritten: make(chan struct{}), @@ -400,11 +423,10 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for defer a.metrics.MeasureSinceWithLabels([]string{"core", "activity", "segment_write"}, a.clock.Now(), []metricsutil.Label{}) - // Swap out the pending fragments + // Swap out the pending regular fragments a.fragmentLock.Lock() - localFragment := a.fragment + currentFragment := a.fragment a.fragment = nil - standbys := a.standbyFragmentsReceived a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.fragmentLock.Unlock() @@ -443,14 +465,38 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for return nil } - // Measure the current fragment - if localFragment != nil { + // Measure the current regular fragment + if currentFragment != nil { a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"}, - float32(len(localFragment.Clients)), + float32(len(currentFragment.Clients)), []metricsutil.Label{ {"type", "entity"}, }) a.metrics.IncrCounterWithLabels([]string{"core", "activity", "fragment_size"}, + float32(len(currentFragment.NonEntityTokens)), + []metricsutil.Label{ + {"type", "direct_token"}, + }) + } + + // Swap out the pending local fragments + a.localFragmentLock.Lock() + localFragment := a.localFragment + a.localFragment = nil + + // standbyLocalFragments := a.standbyLocalFragmentsReceived + // a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0) + + a.localFragmentLock.Unlock() + + // Measure the current local fragment + if localFragment != nil { + a.metrics.IncrCounterWithLabels([]string{"core", "activity", "local_fragment_size"}, + float32(len(localFragment.Clients)), + []metricsutil.Label{ + {"type", "entity"}, + }) + a.metrics.IncrCounterWithLabels([]string{"core", "activity", "local_fragment_size"}, float32(len(localFragment.NonEntityTokens)), []metricsutil.Label{ {"type", "direct_token"}, @@ -460,7 +506,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for // Collect new entities and new tokens. saveChanges := false newEntities := make(map[string]*activity.EntityRecord) - for _, f := range append(standbys, localFragment) { + for _, f := range append(standbys, currentFragment) { if f == nil { continue } @@ -470,7 +516,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for for _, e := range f.Clients { // We could sort by timestamp to see which is first. // We'll ignore that; the order of the append above means - // that we choose entries in localFragment over those + // that we choose entries in currentFragment over those // from standby nodes. newEntities[e.ClientID] = e } @@ -1549,6 +1595,12 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) { endOfMonth.Stop() } + sendInterval := activityFragmentSendInterval + // This changes the interval to a duration that was set for testing purposes + if a.configOverrides.PerfStandbyFragmentSendInterval.Microseconds() > 0 { + sendInterval = a.configOverrides.PerfStandbyFragmentSendInterval + } + sendFunc := func() { ctx, cancel := context.WithTimeout(ctx, activityFragmentSendTimeout) defer cancel() @@ -1576,7 +1628,7 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) { fragmentWaiting = true if !a.configOverrides.DisableTimers { a.logger.Trace("reset fragment timer") - timer.Reset(activityFragmentSendInterval) + timer.Reset(sendInterval) } } case <-timer.C: @@ -1613,6 +1665,11 @@ func (a *ActivityLog) perfStandbyFragmentWorker(ctx context.Context) { a.fragmentLock.Unlock() + // clear local active entity set + a.localFragmentLock.Lock() + a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord) + a.localFragmentLock.Unlock() + // Set timer for next month. // The current segment *probably* hasn't been set yet (via invalidation), // so don't rely on it. @@ -1784,17 +1841,28 @@ func (c *Core) ResetActivityLog() []*activity.LogFragment { allFragments := make([]*activity.LogFragment, 1) a.fragmentLock.Lock() + allFragments[0] = a.fragment a.fragment = nil - allFragments = append(allFragments, a.standbyFragmentsReceived...) a.standbyFragmentsReceived = make([]*activity.LogFragment, 0) a.secondaryGlobalClientFragments = make([]*activity.LogFragment, 0) a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) a.fragmentLock.Unlock() + // local fragments + a.localFragmentLock.Lock() + allFragments = append(allFragments, a.localFragment) + a.localFragment = nil + allFragments = append(allFragments, a.standbyLocalFragmentsReceived...) + a.standbyLocalFragmentsReceived = make([]*activity.LogFragment, 0) + a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord) + a.localFragmentLock.Unlock() + + // global fragments a.globalFragmentLock.Lock() a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) + a.standbyGlobalFragmentsReceived = make([]*activity.LogFragment, 0) a.globalFragmentLock.Unlock() return allFragments } @@ -1833,11 +1901,16 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string, a.fragmentLock.RLock() if a.enabled { - _, present = a.partialMonthClientTracker[clientID] + _, presentInRegularClientMap := a.partialMonthClientTracker[clientID] + _, presentInLocalClientmap := a.partialMonthLocalClientTracker[clientID] + if presentInRegularClientMap || presentInLocalClientmap { + present = true + } } else { present = true } a.fragmentLock.RUnlock() + if present { return } @@ -1846,17 +1919,24 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string, a.fragmentLock.Lock() defer a.fragmentLock.Unlock() + a.localFragmentLock.Lock() + defer a.localFragmentLock.Unlock() + a.globalFragmentLock.Lock() defer a.globalFragmentLock.Unlock() // Re-check entity ID after re-acquiring lock - _, present = a.partialMonthClientTracker[clientID] + _, presentInRegularClientMap := a.partialMonthClientTracker[clientID] + _, presentInLocalClientmap := a.partialMonthLocalClientTracker[clientID] + if presentInRegularClientMap || presentInLocalClientmap { + present = true + } if present { return } + // create fragments if doesn't already exist a.createCurrentFragment() - a.createCurrentGlobalFragment() clientRecord := &activity.EntityRecord{ ClientID: clientID, @@ -1874,18 +1954,22 @@ func (a *ActivityLog) AddActivityToFragment(clientID string, namespaceID string, clientRecord.NonEntity = true } + // add the clients to the regular fragment a.fragment.Clients = append(a.fragment.Clients, clientRecord) + a.partialMonthClientTracker[clientRecord.ClientID] = clientRecord - // Check if the client is local - if local, _ := a.isClientLocal(clientRecord); !local { - // If the client is not local and has not already been seen, then add the client - // to the current global fragment + if local, _ := a.isClientLocal(clientRecord); local { + // If the client is local then add the client to the current local fragment + a.localFragment.Clients = append(a.localFragment.Clients, clientRecord) + a.partialMonthLocalClientTracker[clientRecord.ClientID] = clientRecord + } else { if _, ok := a.globalPartialMonthClientTracker[clientRecord.ClientID]; !ok { - a.globalPartialMonthClientTracker[clientRecord.ClientID] = clientRecord + // If the client is not local and has not already been seen, then add the client + // to the current global fragment a.currentGlobalFragment.Clients = append(a.currentGlobalFragment.Clients, clientRecord) + a.globalPartialMonthClientTracker[clientRecord.ClientID] = clientRecord } } - a.partialMonthClientTracker[clientRecord.ClientID] = clientRecord } // isClientLocal checks whether the given client is on a local mount. @@ -1908,25 +1992,25 @@ func (a *ActivityLog) isClientLocal(client *activity.EntityRecord) (bool, error) return false, nil } -// Create the current fragment if it doesn't already exist. -// Must be called with the lock held. +// Create the fragments (regular fragment, local fragment and global fragment) if it doesn't already exist. +// Must be called with the fragmentLock, localFragmentLock and globalFragmentLock held. func (a *ActivityLog) createCurrentFragment() { if a.fragment == nil { + // create regular fragment a.fragment = &activity.LogFragment{ OriginatingNode: a.nodeID, Clients: make([]*activity.EntityRecord, 0, 120), NonEntityTokens: make(map[string]uint64), } - // Signal that a new segment is available, start - // the timer to send it. - a.newFragmentCh <- struct{}{} - } -} -// Create the current fragment to track global clients seen -// on cluster. Must be called with the globalFragmentLock held -func (a *ActivityLog) createCurrentGlobalFragment() { - if a.currentGlobalFragment == nil { + // create local fragment + a.localFragment = &activity.LogFragment{ + OriginatingNode: a.nodeID, + Clients: make([]*activity.EntityRecord, 0, 120), + NonEntityTokens: make(map[string]uint64), + } + + // create global fragment a.currentGlobalFragment = &activity.LogFragment{ OriginatingCluster: a.core.ClusterID(), Clients: make([]*activity.EntityRecord, 0), @@ -1937,6 +2021,10 @@ func (a *ActivityLog) createCurrentGlobalFragment() { // the timer to send it a.newGlobalClientFragmentCh <- struct{}{} } + + // Signal that a new segment is available, start + // the timer to send it. + a.newFragmentCh <- struct{}{} } } @@ -1960,33 +2048,61 @@ func (a *ActivityLog) receivedGlobalClientFragments(fragment *activity.LogFragme func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) { a.logger.Trace("received fragment from standby", "node", fragment.OriginatingNode) - a.fragmentLock.Lock() - defer a.fragmentLock.Unlock() - + isLocalFragment := false if !a.enabled { return } - a.globalFragmentLock.Lock() - defer a.globalFragmentLock.Unlock() - a.createCurrentGlobalFragment() + a.fragmentLock.Lock() + defer a.fragmentLock.Unlock() + + // Check if the received fragment from standby is a local fragment. + // A fragment can have all local clients or all non-local clients except for regular fragment (which has both currently but will be modified to only hold non-local clients later). + // Check the first client to identify the type of fragment. + if len(fragment.Clients) > 0 { + client := fragment.Clients[0] + if local, _ := a.isClientLocal(client); local { + isLocalFragment = true + + a.localFragmentLock.Lock() + defer a.localFragmentLock.Unlock() + } else { + a.globalFragmentLock.Lock() + defer a.globalFragmentLock.Unlock() + } + } for _, e := range fragment.Clients { a.partialMonthClientTracker[e.ClientID] = e - - // If the client is global, then add to global maps and keep in a global fragment - if local, _ := a.isClientLocal(e); !local { + if isLocalFragment { + a.partialMonthLocalClientTracker[e.ClientID] = e + } else { a.globalPartialMonthClientTracker[e.ClientID] = e - a.currentGlobalFragment.Clients = append(a.currentGlobalFragment.Clients, e) } - } a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment) + if isLocalFragment { + a.standbyLocalFragmentsReceived = append(a.standbyLocalFragmentsReceived, fragment) + } else { + a.standbyGlobalFragmentsReceived = append(a.standbyGlobalFragmentsReceived, fragment) + } + // TODO: check if current segment is full and should be written } +// returns the active local and global clients for the current month +func (a *ActivityLog) GetAllPartialMonthClients() (map[string]*activity.EntityRecord, map[string]*activity.EntityRecord) { + a.localFragmentLock.Lock() + defer a.localFragmentLock.Unlock() + + a.globalFragmentLock.Lock() + defer a.globalFragmentLock.Unlock() + + return a.partialMonthLocalClientTracker, a.globalPartialMonthClientTracker +} + type ResponseCounts struct { EntityClients int `json:"entity_clients" mapstructure:"entity_clients"` NonEntityClients int `json:"non_entity_clients" mapstructure:"non_entity_clients"` diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 59747d6953..c423d44a45 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -23,6 +23,7 @@ import ( "github.com/go-test/deep" "github.com/golang/protobuf/proto" "github.com/hashicorp/go-uuid" + "github.com/hashicorp/vault/builtin/credential/userpass" "github.com/hashicorp/vault/helper/constants" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/helper/timeutil" @@ -34,7 +35,16 @@ import ( // TestActivityLog_Creation calls AddEntityToFragment and verifies that it appears correctly in a.fragment. func TestActivityLog_Creation(t *testing.T) { - core, _, _ := TestCoreUnsealed(t) + storage := &logical.InmemStorage{} + coreConfig := &CoreConfig{ + CredentialBackends: map[string]logical.Factory{ + "userpass": userpass.Factory, + }, + Physical: storage.Underlying(), + } + + cluster := NewTestCluster(t, coreConfig, nil) + core := cluster.Cores[0].Core a := core.activityLog a.SetEnable(true) @@ -49,6 +59,10 @@ func TestActivityLog_Creation(t *testing.T) { t.Fatal("activity log already has fragment") } + if a.localFragment != nil { + t.Fatal("activity log already has a local fragment") + } + const entity_id = "entity_id_75432" const namespace_id = "ns123" ts := time.Now() @@ -118,6 +132,49 @@ func TestActivityLog_Creation(t *testing.T) { if actual != 1 { t.Errorf("mismatched number of tokens, %v vs %v", actual, 1) } + + // test local fragment + localMe := &MountEntry{ + Table: credentialTableType, + Path: "userpass-local/", + Type: "userpass", + Local: true, + Accessor: "local_mount_accessor", + } + err := core.enableCredential(namespace.RootContext(nil), localMe) + require.NoError(t, err) + + const local_entity_id = "entity_id_75434" + local_ts := time.Now() + + a.AddClientToFragment(local_entity_id, "root", local_ts.Unix(), false, "local_mount_accessor") + + if a.localFragment.OriginatingNode != a.nodeID { + t.Errorf("mismatched node ID, %q vs %q", a.localFragment.OriginatingNode, a.nodeID) + } + + if a.localFragment.Clients == nil { + t.Fatal("no local fragment entity slice") + } + + if a.localFragment.NonEntityTokens == nil { + t.Fatal("no local fragment token map") + } + + if len(a.localFragment.Clients) != 1 { + t.Fatalf("wrong number of entities %v", len(a.localFragment.Clients)) + } + + er = a.localFragment.Clients[0] + if er.ClientID != local_entity_id { + t.Errorf("mimatched entity ID, %q vs %q", er.ClientID, local_entity_id) + } + if er.NamespaceID != "root" { + t.Errorf("mimatched namespace ID, %q vs %q", er.NamespaceID, "root") + } + if er.Timestamp != ts.Unix() { + t.Errorf("mimatched timestamp, %v vs %v", er.Timestamp, ts.Unix()) + } } // TestActivityLog_Creation_WrappingTokens calls HandleTokenUsage for two wrapping tokens, and verifies that this @@ -139,6 +196,13 @@ func TestActivityLog_Creation_WrappingTokens(t *testing.T) { t.Fatal("activity log already has fragment") } a.fragmentLock.Unlock() + + a.localFragmentLock.Lock() + if a.localFragment != nil { + t.Fatal("activity log already has local fragment") + } + a.localFragmentLock.Unlock() + const namespace_id = "ns123" te := &logical.TokenEntry{ @@ -349,6 +413,10 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) { t.Errorf("fragment was not reset after write to storage") } + if a.localFragment != nil { + t.Errorf("local fragment was not reset after write to storage") + } + out := &activity.TokenCount{} protoSegment := readSegmentFromStorage(t, core, path) err = proto.Unmarshal(protoSegment.Value, out) @@ -381,6 +449,10 @@ func TestActivityLog_SaveTokensToStorage(t *testing.T) { t.Errorf("fragment was not reset after write to storage") } + if a.localFragment != nil { + t.Errorf("local fragment was not reset after write to storage") + } + protoSegment = readSegmentFromStorage(t, core, path) out = &activity.TokenCount{} err = proto.Unmarshal(protoSegment.Value, out) @@ -450,6 +522,10 @@ func TestActivityLog_SaveTokensToStorageDoesNotUpdateTokenCount(t *testing.T) { t.Errorf("fragment was not reset after write to storage") } + if a.localFragment != nil { + t.Errorf("local fragment was not reset after write to storage") + } + // Assert that no tokens have been written to the fragment readSegmentFromStorageNil(t, core, tokenPath) @@ -513,6 +589,9 @@ func TestActivityLog_SaveEntitiesToStorage(t *testing.T) { t.Errorf("fragment was not reset after write to storage") } + if a.localFragment != nil { + t.Errorf("local fragment was not reset after write to storage") + } protoSegment := readSegmentFromStorage(t, core, path) out := &activity.EntityActivityLog{} err = proto.Unmarshal(protoSegment.Value, out) @@ -601,8 +680,8 @@ func TestModifyResponseMonthsNilAppend(t *testing.T) { } // TestActivityLog_ReceivedFragment calls receivedFragment with a fragment and verifies it gets added to -// standbyFragmentsReceived. Send the same fragment again and then verify that it doesn't change the entity map but does -// get added to standbyFragmentsReceived. +// standbyFragmentsReceived and standbyGlobalFragmentsReceived. Send the same fragment again and then verify that it doesn't change the entity map but does +// get added to standbyFragmentsReceived and standbyGlobalFragmentsReceived. func TestActivityLog_ReceivedFragment(t *testing.T) { core, _, _ := TestCoreUnsealed(t) a := core.activityLog @@ -644,6 +723,10 @@ func TestActivityLog_ReceivedFragment(t *testing.T) { t.Fatalf("fragment count is %v, expected 1", len(a.standbyFragmentsReceived)) } + if len(a.standbyGlobalFragmentsReceived) != 1 { + t.Fatalf("fragment count is %v, expected 1", len(a.standbyGlobalFragmentsReceived)) + } + // Send a duplicate, should be stored but not change entity map a.receivedFragment(fragment) @@ -652,6 +735,9 @@ func TestActivityLog_ReceivedFragment(t *testing.T) { if len(a.standbyFragmentsReceived) != 2 { t.Fatalf("fragment count is %v, expected 2", len(a.standbyFragmentsReceived)) } + if len(a.standbyGlobalFragmentsReceived) != 2 { + t.Fatalf("fragment count is %v, expected 2", len(a.standbyGlobalFragmentsReceived)) + } } // TestActivityLog_availableLogsEmptyDirectory verifies that availableLogs returns an empty slice when the log directory @@ -1288,10 +1374,16 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) { a.l.Lock() defer a.l.Unlock() + a.fragmentLock.Lock() defer a.fragmentLock.Unlock() + + a.localFragmentLock.Lock() + defer a.localFragmentLock.Unlock() + a.globalFragmentLock.Lock() defer a.globalFragmentLock.Unlock() + a.currentSegment = segmentInfo{ startTimestamp: time.Time{}.Unix(), currentClients: &activity.EntityActivityLog{ @@ -1302,6 +1394,7 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) { } a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) + a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord) a.globalPartialMonthClientTracker = make(map[string]*activity.EntityRecord) } @@ -1498,9 +1591,12 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) { if tc.refresh { a.l.Lock() a.fragmentLock.Lock() + a.localFragmentLock.Lock() a.partialMonthClientTracker = make(map[string]*activity.EntityRecord) + a.partialMonthLocalClientTracker = make(map[string]*activity.EntityRecord) a.currentSegment.startTimestamp = tc.time a.fragmentLock.Unlock() + a.localFragmentLock.Unlock() a.l.Unlock() } @@ -4745,15 +4841,45 @@ func TestActivityLog_HandleEndOfMonth(t *testing.T) { // clients and verifies that they are added correctly to the tracking data // structures func TestAddActivityToFragment(t *testing.T) { - core, _, _ := TestCoreUnsealed(t) + storage := &logical.InmemStorage{} + coreConfig := &CoreConfig{ + CredentialBackends: map[string]logical.Factory{ + "userpass": userpass.Factory, + }, + Physical: storage.Underlying(), + } + + cluster := NewTestCluster(t, coreConfig, nil) + core := cluster.Cores[0].Core a := core.activityLog a.SetEnable(true) + require.Nil(t, a.fragment) + require.Nil(t, a.localFragment) + require.Nil(t, a.currentGlobalFragment) + mount := "mount" + localMount := "localMount" ns := "root" id := "id1" + + // keeps track of the number of clients added to localFragment + localCount := 0 + + // add a client to regular fragment a.AddActivityToFragment(id, ns, 0, entityActivityType, mount) + // create a local mount accessor for local clients + localMe := &MountEntry{ + Table: credentialTableType, + Path: "userpass-local/", + Type: "userpass", + Local: true, + Accessor: localMount, + } + err := core.enableCredential(namespace.RootContext(nil), localMe) + require.NoError(t, err) + testCases := []struct { name string id string @@ -4761,6 +4887,7 @@ func TestAddActivityToFragment(t *testing.T) { isAdded bool expectedID string isNonEntity bool + isLocal bool }{ { name: "duplicate", @@ -4768,6 +4895,7 @@ func TestAddActivityToFragment(t *testing.T) { activityType: entityActivityType, isAdded: false, expectedID: id, + isLocal: false, }, { name: "new entity", @@ -4775,6 +4903,7 @@ func TestAddActivityToFragment(t *testing.T) { activityType: entityActivityType, isAdded: true, expectedID: "new-id", + isLocal: false, }, { name: "new nonentity", @@ -4783,6 +4912,7 @@ func TestAddActivityToFragment(t *testing.T) { isAdded: true, expectedID: "new-nonentity", isNonEntity: true, + isLocal: true, }, { name: "new acme", @@ -4791,6 +4921,7 @@ func TestAddActivityToFragment(t *testing.T) { isAdded: true, expectedID: "pki-acme.new-acme", isNonEntity: true, + isLocal: false, }, { name: "new secret sync", @@ -4799,11 +4930,22 @@ func TestAddActivityToFragment(t *testing.T) { isAdded: true, expectedID: "new-secret-sync", isNonEntity: true, + isLocal: false, + }, + { + name: "new local entity", + id: "new-local-id", + activityType: entityActivityType, + isAdded: true, + expectedID: "new-local-id", + isNonEntity: false, + isLocal: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + var mountAccessor string a.fragmentLock.RLock() numClientsBefore := len(a.fragment.Clients) a.fragmentLock.RUnlock() @@ -4812,7 +4954,25 @@ func TestAddActivityToFragment(t *testing.T) { globalClientsBefore := len(a.currentGlobalFragment.Clients) a.globalFragmentLock.RUnlock() - a.AddActivityToFragment(tc.id, ns, 0, tc.activityType, mount) + numLocalClientsBefore := 0 + + // add client to the fragment + if tc.isLocal { + // data already present in local fragment, get client count before adding activity to fragment + a.localFragmentLock.RLock() + numLocalClientsBefore = len(a.localFragment.Clients) + a.localFragmentLock.RUnlock() + + mountAccessor = localMount + a.AddActivityToFragment(tc.id, ns, 0, tc.activityType, localMount) + + require.NotNil(t, a.localFragment) + localCount++ + } else { + mountAccessor = mount + a.AddActivityToFragment(tc.id, ns, 0, tc.activityType, mount) + } + a.fragmentLock.RLock() defer a.fragmentLock.RUnlock() numClientsAfter := len(a.fragment.Clients) @@ -4820,14 +4980,36 @@ func TestAddActivityToFragment(t *testing.T) { defer a.globalFragmentLock.RUnlock() globalClientsAfter := len(a.currentGlobalFragment.Clients) - if tc.isAdded { - require.Equal(t, numClientsBefore+1, numClientsAfter) - if tc.activityType != nonEntityTokenActivityType { - require.Equal(t, globalClientsBefore+1, globalClientsAfter) + // if local client, verify if local fragment is updated + if tc.isLocal { + a.localFragmentLock.RLock() + defer a.localFragmentLock.RUnlock() + + numLocalClientsAfter := len(a.localFragment.Clients) + switch tc.isAdded { + case true: + require.Equal(t, numLocalClientsBefore+1, numLocalClientsAfter) + default: + require.Equal(t, numLocalClientsBefore, numLocalClientsAfter) } + } else { + // verify global clients + switch tc.isAdded { + case true: + if tc.activityType != nonEntityTokenActivityType { + require.Equal(t, globalClientsBefore+1, globalClientsAfter) + } + default: + require.Equal(t, globalClientsBefore, globalClientsAfter) + } + } + + // for now local clients are added to both regular fragment and local fragment. + // this will be modified in ticket vault-31234 + if tc.isAdded { + require.Equal(t, numClientsBefore+1, numClientsAfter) } else { require.Equal(t, numClientsBefore, numClientsAfter) - require.Equal(t, globalClientsBefore, globalClientsAfter) } require.Contains(t, a.partialMonthClientTracker, tc.expectedID) @@ -4836,10 +5018,21 @@ func TestAddActivityToFragment(t *testing.T) { NamespaceID: ns, Timestamp: 0, NonEntity: tc.isNonEntity, - MountAccessor: mount, + MountAccessor: mountAccessor, ClientType: tc.activityType, }, a.partialMonthClientTracker[tc.expectedID])) - if tc.activityType != nonEntityTokenActivityType { + + if tc.isLocal { + require.Contains(t, a.partialMonthLocalClientTracker, tc.expectedID) + require.True(t, proto.Equal(&activity.EntityRecord{ + ClientID: tc.expectedID, + NamespaceID: ns, + Timestamp: 0, + NonEntity: tc.isNonEntity, + MountAccessor: mountAccessor, + ClientType: tc.activityType, + }, a.partialMonthLocalClientTracker[tc.expectedID])) + } else { require.Contains(t, a.globalPartialMonthClientTracker, tc.expectedID) require.True(t, proto.Equal(&activity.EntityRecord{ ClientID: tc.expectedID, @@ -4854,6 +5047,81 @@ func TestAddActivityToFragment(t *testing.T) { } } +// TestGetAllPartialMonthClients adds activity for a local and regular clients and verifies that +// GetAllPartialMonthClients returns the right local and global clients +func TestGetAllPartialMonthClients(t *testing.T) { + storage := &logical.InmemStorage{} + coreConfig := &CoreConfig{ + CredentialBackends: map[string]logical.Factory{ + "userpass": userpass.Factory, + }, + Physical: storage.Underlying(), + } + + cluster := NewTestCluster(t, coreConfig, nil) + core := cluster.Cores[0].Core + a := core.activityLog + a.SetEnable(true) + + require.Nil(t, a.fragment) + require.Nil(t, a.localFragment) + require.Nil(t, a.currentGlobalFragment) + + ns := "root" + mount := "mount" + localMount := "localMount" + clientID := "id1" + localClientID := "new-local-id" + + // add a client to regular fragment, this should be added to globalPartialMonthClientTracker + a.AddActivityToFragment(clientID, ns, 0, entityActivityType, mount) + + require.NotNil(t, a.localFragment) + require.NotNil(t, a.fragment) + require.NotNil(t, a.currentGlobalFragment) + + // create a local mount accessor + localMe := &MountEntry{ + Table: credentialTableType, + Path: "userpass-local/", + Type: "userpass", + Local: true, + Accessor: localMount, + } + err := core.enableCredential(namespace.RootContext(nil), localMe) + require.NoError(t, err) + + // add client to local fragment, this should be added to partialMonthLocalClientTracker + a.AddActivityToFragment(localClientID, ns, 0, entityActivityType, localMount) + + require.NotNil(t, a.localFragment) + + // GetAllPartialMonthClients returns the partialMonthLocalClientTracker and globalPartialMonthClientTracker + localClients, globalClients := a.GetAllPartialMonthClients() + + // verify the returned localClients + require.Len(t, localClients, 1) + require.Contains(t, localClients, localClientID) + require.True(t, proto.Equal(&activity.EntityRecord{ + ClientID: localClientID, + NamespaceID: ns, + Timestamp: 0, + MountAccessor: localMount, + ClientType: entityActivityType, + }, localClients[localClientID])) + + // verify the returned globalClients + require.Len(t, globalClients, 1) + require.Contains(t, globalClients, clientID) + require.True(t, proto.Equal(&activity.EntityRecord{ + ClientID: clientID, + NamespaceID: ns, + Timestamp: 0, + MountAccessor: mount, + ClientType: entityActivityType, + }, globalClients[clientID])) +} + // TestActivityLog_reportPrecomputedQueryMetrics creates 3 clients per type and // calls reportPrecomputedQueryMetrics. The test verifies that the metric sink // gets metrics reported correctly, based on the segment time matching the diff --git a/vault/activity_log_testing_util.go b/vault/activity_log_testing_util.go index 42fd3ca7f8..a50be13e42 100644 --- a/vault/activity_log_testing_util.go +++ b/vault/activity_log_testing_util.go @@ -57,17 +57,27 @@ func (c *Core) InjectActivityLogDataThisMonth(t *testing.T) map[string]*activity return c.activityLog.partialMonthClientTracker } -// GetActiveClients returns the in-memory partialMonthClientTracker from an +// GetActiveClients returns the in-memory globalPartialMonthClientTracker and partialMonthLocalClientTracker from an // activity log. func (c *Core) GetActiveClients() map[string]*activity.EntityRecord { out := make(map[string]*activity.EntityRecord) c.stateLock.RLock() c.activityLog.globalFragmentLock.RLock() + c.activityLog.localFragmentLock.RLock() + + // add active global clients for k, v := range c.activityLog.globalPartialMonthClientTracker { out[k] = v } + + // add active local clients + for k, v := range c.activityLog.partialMonthLocalClientTracker { + out[k] = v + } + c.activityLog.globalFragmentLock.RUnlock() + c.activityLog.localFragmentLock.RUnlock() c.stateLock.RUnlock() return out @@ -168,6 +178,9 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart if a.partialMonthClientTracker == nil { t.Errorf("expected non-nil partialMonthClientTracker") } + if a.partialMonthLocalClientTracker == nil { + t.Errorf("expected non-nil partialMonthLocalClientTracker") + } if len(a.currentSegment.currentClients.Clients) > 0 { t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentClients) } @@ -177,6 +190,9 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart if len(a.partialMonthClientTracker) > 0 { t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthClientTracker) } + if len(a.partialMonthLocalClientTracker) > 0 { + t.Errorf("expected no active entity segment to be loaded. got: %v", a.partialMonthLocalClientTracker) + } if verifyTimeNotZero { if a.currentSegment.startTimestamp == 0 { @@ -255,3 +271,11 @@ func (c *Core) GetActiveGlobalFragment() *activity.LogFragment { func (c *Core) GetSecondaryGlobalFragments() []*activity.LogFragment { return c.activityLog.secondaryGlobalClientFragments } + +func (c *Core) GetActiveLocalFragment() *activity.LogFragment { + return c.activityLog.localFragment +} + +func (c *Core) GetActiveFragment() *activity.LogFragment { + return c.activityLog.fragment +}