From 86d23d614f01059d69e44405ff7c5553a2fe72c7 Mon Sep 17 00:00:00 2001 From: Theron Voran Date: Thu, 13 Feb 2025 13:41:20 -0800 Subject: [PATCH] CE changes for VAULT-33452 (#29618) --- changelog/29618.txt | 3 ++ vault/eventbus/bus.go | 48 +++++++++---------- vault/eventbus/bus_test.go | 32 +++++++------ vault/eventbus/filter.go | 86 +++++++++++++++++------------------ vault/eventbus/filter_test.go | 5 +- 5 files changed, 91 insertions(+), 83 deletions(-) create mode 100644 changelog/29618.txt diff --git a/changelog/29618.txt b/changelog/29618.txt new file mode 100644 index 0000000000..9d09292da4 --- /dev/null +++ b/changelog/29618.txt @@ -0,0 +1,3 @@ +```release-note:improvement +events (enterprise): Send events downstream to a performance standby node only when there is a subscriber on the standby node with a filter matching the events. +``` diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index a55f3d4aed..2eb112e52b 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -220,10 +220,10 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP return bus.subscribeInternal(ctx, namespacePathPatterns, pattern, bexprFilter, nil) } -// subscribeInternal creates the pipeline and connects it to the event bus to receive events. -// if the cluster is specified, then the namespacePathPatterns, pattern, and bexprFilter are ignored, and instead this -// subscription will be tied to the given cluster's filter. -func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string, cluster *string) (<-chan *eventlogger.Event, context.CancelFunc, error) { +// subscribeInternal creates the pipeline and connects it to the event bus to receive events. If the +// clusterNode is specified, then the namespacePathPatterns, pattern, and bexprFilter are ignored, +// and instead this subscription will be tied to the given cluster node's filter. +func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string, clusterNode *string) (<-chan *eventlogger.Event, context.CancelFunc, error) { // subscriptions are still stored even if the bus has not been started pipelineID, err := uuid.GenerateUUID() if err != nil { @@ -241,8 +241,8 @@ func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPattern } var filterNode *eventlogger.Filter - if cluster != nil { - filterNode, err = newClusterFilterNode(bus.filters, clusterID(*cluster)) + if clusterNode != nil { + filterNode, err = newClusterNodeFilterNode(bus.filters, clusterNodeID(*clusterNode)) if err != nil { return nil, nil, err } @@ -265,7 +265,7 @@ func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPattern ctx, cancel := context.WithCancel(ctx) asyncNode := newAsyncNode(ctx, bus.logger, bus.broker, func() { - if cluster == nil { + if clusterNode == nil { bus.filters.removePattern(bus.filters.self, namespacePathPatterns, pattern) } }) @@ -307,9 +307,9 @@ func (bus *EventBus) GlobalMatch(ns *namespace.Namespace, eventType logical.Even return bus.filters.globalMatch(ns, eventType) } -// ApplyClusterFilterChanges applies the given filter changes to the cluster's filters. -func (bus *EventBus) ApplyClusterFilterChanges(c string, changes []FilterChange) { - bus.filters.applyChanges(clusterID(c), changes) +// ApplyClusterNodeFilterChanges applies the given filter changes to the cluster node's filters. +func (bus *EventBus) ApplyClusterNodeFilterChanges(c string, changes []FilterChange) { + bus.filters.applyChanges(clusterNodeID(c), changes) } // ApplyGlobalFilterChanges applies the given filter changes to the global filters. @@ -322,9 +322,9 @@ func (bus *EventBus) ClearGlobalFilter() { bus.filters.clearGlobalPatterns() } -// ClearClusterFilter removes all entries from the given cluster's filter. -func (bus *EventBus) ClearClusterFilter(id string) { - bus.filters.clearClusterPatterns(clusterID(id)) +// ClearClusterNodeFilter removes all entries from the given cluster node's filter. +func (bus *EventBus) ClearClusterNodeFilter(id string) { + bus.filters.clearClusterNodePatterns(clusterNodeID(id)) } // NotifyOnGlobalFilterChanges returns a channel that receives changes to the global filter. @@ -332,14 +332,14 @@ func (bus *EventBus) NotifyOnGlobalFilterChanges(ctx context.Context) (<-chan [] return bus.filters.watch(ctx, globalCluster) } -// NotifyOnLocalFilterChanges returns a channel that receives changes to the filter for the current cluster. +// NotifyOnLocalFilterChanges returns a channel that receives changes to the filter for the current cluster node. func (bus *EventBus) NotifyOnLocalFilterChanges(ctx context.Context) (<-chan []FilterChange, context.CancelFunc, error) { - return bus.NotifyOnClusterFilterChanges(ctx, string(bus.filters.self)) + return bus.NotifyOnClusterNodeFilterChanges(ctx, string(bus.filters.self)) } -// NotifyOnClusterFilterChanges returns a channel that receives changes to the filter for the given cluster. -func (bus *EventBus) NotifyOnClusterFilterChanges(ctx context.Context, cluster string) (<-chan []FilterChange, context.CancelFunc, error) { - return bus.filters.watch(ctx, clusterID(cluster)) +// NotifyOnClusterNodeFilterChanges returns a channel that receives changes to the filter for the given cluster node. +func (bus *EventBus) NotifyOnClusterNodeFilterChanges(ctx context.Context, clusterNode string) (<-chan []FilterChange, context.CancelFunc, error) { + return bus.filters.watch(ctx, clusterNodeID(clusterNode)) } // NewAllEventsSubscription creates a new subscription to all events. @@ -353,18 +353,18 @@ func (bus *EventBus) NewGlobalSubscription(ctx context.Context) (<-chan *eventlo return bus.subscribeInternal(ctx, nil, "", "", &g) } -// NewClusterSubscription creates a new subscription to all events that match the given cluster's filter. -func (bus *EventBus) NewClusterSubscription(ctx context.Context, cluster string) (<-chan *eventlogger.Event, context.CancelFunc, error) { - return bus.subscribeInternal(ctx, nil, "", "", &cluster) +// NewClusterNodeSubscription creates a new subscription to all events that match the given cluster node's filter. +func (bus *EventBus) NewClusterNodeSubscription(ctx context.Context, clusterNode string) (<-chan *eventlogger.Event, context.CancelFunc, error) { + return bus.subscribeInternal(ctx, nil, "", "", &clusterNode) } -// creates a new filter node that is tied to the filter for a given cluster -func newClusterFilterNode(filters *Filters, c clusterID) (*eventlogger.Filter, error) { +// creates a new filter node that is tied to the filter for a given cluster node +func newClusterNodeFilterNode(filters *Filters, c clusterNodeID) (*eventlogger.Filter, error) { return &eventlogger.Filter{ Predicate: func(e *eventlogger.Event) (bool, error) { eventRecv := e.Payload.(*logical.EventReceived) eventNs := strings.Trim(eventRecv.Namespace, "/") - if filters.clusterMatch(c, &namespace.Namespace{ + if filters.clusterNodeMatch(c, &namespace.Namespace{ Path: eventNs, }, logical.EventType(eventRecv.EventType)) { return true, nil diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index 7072cb09a1..40bbc6618d 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -818,8 +818,9 @@ func TestSubscribeGlobal_WithApply(t *testing.T) { } } -// TestSubscribeCluster tests that the cluster filter subscription mechanism works. -func TestSubscribeCluster(t *testing.T) { +// TestSubscribeClusterNode tests that the cluster node filter subscription +// mechanism works. +func TestSubscribeClusterNode(t *testing.T) { bus, err := NewEventBus("", nil) if err != nil { t.Fatal(err) @@ -830,7 +831,7 @@ func TestSubscribeCluster(t *testing.T) { bus.filters.addPattern("somecluster", []string{""}, "abc*") ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(cancelFunc) - ch, cancel2, err := bus.NewClusterSubscription(ctx, "somecluster") + ch, cancel2, err := bus.NewClusterNodeSubscription(ctx, "somecluster") if err != nil { t.Fatal(err) } @@ -854,8 +855,9 @@ func TestSubscribeCluster(t *testing.T) { } } -// TestSubscribeCluster_WithApply tests that the cluster filter subscription mechanism works when using ApplyClusterFilterChanges. -func TestSubscribeCluster_WithApply(t *testing.T) { +// TestSubscribeClusterNode_WithApply tests that the cluster node filter +// subscription mechanism works when using ApplyClusterNodeFilterChanges. +func TestSubscribeClusterNode_WithApply(t *testing.T) { bus, err := NewEventBus("", nil) if err != nil { t.Fatal(err) @@ -865,14 +867,14 @@ func TestSubscribeCluster_WithApply(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(cancelFunc) - bus.ApplyClusterFilterChanges("somecluster", []FilterChange{ + bus.ApplyClusterNodeFilterChanges("somecluster", []FilterChange{ { Operation: FilterChangeAdd, NamespacePatterns: []string{""}, EventTypePattern: "abc*", }, }) - ch, cancel2, err := bus.NewClusterSubscription(ctx, "somecluster") + ch, cancel2, err := bus.NewClusterNodeSubscription(ctx, "somecluster") if err != nil { t.Fatal(err) } @@ -937,8 +939,9 @@ func TestClearGlobalFilter(t *testing.T) { } } -// TestClearClusterFilter tests that clearing a cluster filter means no messages get through. -func TestClearClusterFilter(t *testing.T) { +// TestClearClusterNodeFilter tests that clearing a cluster node filter means no +// messages get through. +func TestClearClusterNodeFilter(t *testing.T) { bus, err := NewEventBus("", nil) if err != nil { t.Fatal(err) @@ -948,15 +951,15 @@ func TestClearClusterFilter(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(cancelFunc) - bus.ApplyClusterFilterChanges("somecluster", []FilterChange{ + bus.ApplyClusterNodeFilterChanges("somecluster", []FilterChange{ { Operation: FilterChangeAdd, NamespacePatterns: []string{""}, EventTypePattern: "abc*", }, }) - bus.ClearClusterFilter("somecluster") - ch, cancel2, err := bus.NewClusterSubscription(ctx, "somecluster") + bus.ClearClusterNodeFilter("somecluster") + ch, cancel2, err := bus.NewClusterNodeSubscription(ctx, "somecluster") if err != nil { t.Fatal(err) } @@ -1020,7 +1023,8 @@ func TestNotifyOnGlobalFilterChanges(t *testing.T) { } } -// TestNotifyOnLocalFilterChanges tests that notifications on local cluster filter changes are sent. +// TestNotifyOnLocalFilterChanges tests that notifications on local cluster node +// filter changes are sent. func TestNotifyOnLocalFilterChanges(t *testing.T) { bus, err := NewEventBus("somecluster", nil) if err != nil { @@ -1037,7 +1041,7 @@ func TestNotifyOnLocalFilterChanges(t *testing.T) { t.Fatal(err) } t.Cleanup(cancel2) - bus.ApplyClusterFilterChanges("somecluster", []FilterChange{ + bus.ApplyClusterNodeFilterChanges("somecluster", []FilterChange{ { Operation: FilterChangeAdd, NamespacePatterns: []string{""}, diff --git a/vault/eventbus/filter.go b/vault/eventbus/filter.go index dad27bbf36..95dda44b1c 100644 --- a/vault/eventbus/filter.go +++ b/vault/eventbus/filter.go @@ -19,18 +19,18 @@ import ( const globalCluster = "" -// Filters keeps track of all the event patterns that each cluster is interested in. +// Filters keeps track of all the event patterns that each cluster node is interested in. type Filters struct { lock sync.RWMutex - self clusterID - filters map[clusterID]*ClusterFilter + self clusterNodeID + filters map[clusterNodeID]*ClusterNodeFilter // notifyChanges is used to notify about changes to filters. The condition variables are tied to single lock above. - notifyChanges map[clusterID]*sync.Cond + notifyChanges map[clusterNodeID]*sync.Cond } -// clusterID is used to syntactically indicate that the string is a cluster's identifier. -type clusterID string +// clusterNodeID is used to syntactically indicate that the string is a cluster nodes's identifier. +type clusterNodeID string // pattern is used to represent one or more combinations of patterns type pattern struct { @@ -46,14 +46,14 @@ func (p pattern) isEmpty() bool { return p.namespacePatterns == "" && p.eventTypePattern == "" } -// ClusterFilter keeps track of all patterns that a particular cluster is interested in. -type ClusterFilter struct { +// ClusterNodeFilter keeps track of all patterns that a particular cluster node is interested in. +type ClusterNodeFilter struct { patterns sets.Set[pattern] } -// match checks if the given ns and eventType matches any pattern in the cluster's filter. +// match checks if the given ns and eventType matches any pattern in the cluster node's filter. // Must be called while holding a (read) lock for the filter. -func (nf *ClusterFilter) match(ns *namespace.Namespace, eventType logical.EventType) bool { +func (nf *ClusterNodeFilter) match(ns *namespace.Namespace, eventType logical.EventType) bool { if nf == nil { return false } @@ -69,14 +69,14 @@ func (nf *ClusterFilter) match(ns *namespace.Namespace, eventType logical.EventT return false } -// NewFilters creates an empty set of filters to keep track of each cluster's pattern interests. +// NewFilters creates an empty set of filters to keep track of each cluster node's pattern interests. func NewFilters(self string) *Filters { f := &Filters{ - self: clusterID(self), - filters: map[clusterID]*ClusterFilter{}, - notifyChanges: map[clusterID]*sync.Cond{}, + self: clusterNodeID(self), + filters: map[clusterNodeID]*ClusterNodeFilter{}, + notifyChanges: map[clusterNodeID]*sync.Cond{}, } - f.notifyChanges[clusterID(self)] = sync.NewCond(&f.lock) + f.notifyChanges[clusterNodeID(self)] = sync.NewCond(&f.lock) f.notifyChanges[globalCluster] = sync.NewCond(&f.lock) return f } @@ -89,7 +89,7 @@ func (f *Filters) String() string { return x } -func (nf *ClusterFilter) String() string { +func (nf *ClusterNodeFilter) String() string { var x []string l := nf.patterns.UnsortedList() for _, v := range l { @@ -113,7 +113,7 @@ func (f *Filters) clearGlobalPatterns() { delete(f.filters, globalCluster) } -func (f *Filters) getOrCreateNotify(c clusterID) *sync.Cond { +func (f *Filters) getOrCreateNotify(c clusterNodeID) *sync.Cond { // fast check when we don't need to create the Cond f.lock.RLock() n, ok := f.notifyChanges[c] @@ -133,7 +133,7 @@ func (f *Filters) getOrCreateNotify(c clusterID) *sync.Cond { return n } -func (f *Filters) notify(c clusterID) { +func (f *Filters) notify(c clusterNodeID) { f.lock.RLock() defer f.lock.RUnlock() if notifier, ok := f.notifyChanges[c]; ok { @@ -141,16 +141,16 @@ func (f *Filters) notify(c clusterID) { } } -func (f *Filters) clearClusterPatterns(c clusterID) { +func (f *Filters) clearClusterNodePatterns(c clusterNodeID) { defer f.notify(c) f.lock.Lock() defer f.lock.Unlock() delete(f.filters, c) } -// copyPatternWithLock gets a copy of a cluster's filters -func (f *Filters) copyPatternWithLock(c clusterID) *ClusterFilter { - filters := &ClusterFilter{} +// copyPatternWithLock gets a copy of a cluster node's filters +func (f *Filters) copyPatternWithLock(c clusterNodeID) *ClusterNodeFilter { + filters := &ClusterNodeFilter{} if got, ok := f.filters[c]; ok { filters.patterns = got.patterns.Clone() } else { @@ -160,7 +160,7 @@ func (f *Filters) copyPatternWithLock(c clusterID) *ClusterFilter { } // applyChanges applies the changes in the given list, atomically. -func (f *Filters) applyChanges(c clusterID, changes []FilterChange) { +func (f *Filters) applyChanges(c clusterNodeID, changes []FilterChange) { defer f.notify(c) f.lock.Lock() defer f.lock.Unlock() @@ -173,7 +173,7 @@ func (f *Filters) applyChanges(c clusterID, changes []FilterChange) { for _, change := range changes { applyChange(newPatterns, &change) } - f.filters[c] = &ClusterFilter{patterns: newPatterns} + f.filters[c] = &ClusterNodeFilter{patterns: newPatterns} } // applyChange applies a single filter change to the given set. @@ -204,13 +204,13 @@ func cleanJoinNamespaces(nsPatterns []string) string { return strings.Join(trimmed, " ") } -// addPattern adds a pattern to a node's list. -func (f *Filters) addPattern(c clusterID, namespacePatterns []string, eventTypePattern string) { +// addPattern adds a pattern to a cluster node's list. +func (f *Filters) addPattern(c clusterNodeID, namespacePatterns []string, eventTypePattern string) { defer f.notify(c) f.lock.Lock() defer f.lock.Unlock() if _, ok := f.filters[c]; !ok { - f.filters[c] = &ClusterFilter{ + f.filters[c] = &ClusterNodeFilter{ patterns: sets.New[pattern](), } } @@ -220,8 +220,8 @@ func (f *Filters) addPattern(c clusterID, namespacePatterns []string, eventTypeP f.filters[c].patterns.Insert(p) } -// removePattern removes a pattern from a cluster's list. -func (f *Filters) removePattern(c clusterID, namespacePatterns []string, eventTypePattern string) { +// removePattern removes a pattern from a cluster node's list. +func (f *Filters) removePattern(c clusterNodeID, namespacePatterns []string, eventTypePattern string) { defer f.notify(c) nsPatterns := slices.Clone(namespacePatterns) sort.Strings(nsPatterns) @@ -235,7 +235,7 @@ func (f *Filters) removePattern(c clusterID, namespacePatterns []string, eventTy filters.patterns.Delete(check) } -// anyMatch returns true if any cluster's pattern list matches the arguments. +// anyMatch returns true if any cluster node's pattern list matches the arguments. func (f *Filters) anyMatch(ns *namespace.Namespace, eventType logical.EventType) bool { f.lock.RLock() defer f.lock.RUnlock() @@ -249,24 +249,24 @@ func (f *Filters) anyMatch(ns *namespace.Namespace, eventType logical.EventType) // globalMatch returns true if the global cluster's pattern list matches the arguments. func (f *Filters) globalMatch(ns *namespace.Namespace, eventType logical.EventType) bool { - return f.clusterMatch(globalCluster, ns, eventType) + return f.clusterNodeMatch(globalCluster, ns, eventType) } -// clusterMatch returns true if the given cluster's pattern list matches the arguments. -func (f *Filters) clusterMatch(c clusterID, ns *namespace.Namespace, eventType logical.EventType) bool { +// clusterNodeMatch returns true if the given cluster node's pattern list matches the arguments. +func (f *Filters) clusterNodeMatch(c clusterNodeID, ns *namespace.Namespace, eventType logical.EventType) bool { f.lock.RLock() defer f.lock.RUnlock() return f.filters[c].match(ns, eventType) } -// localMatch returns true if the local cluster's pattern list matches the arguments. +// localMatch returns true if the local cluster node's pattern list matches the arguments. func (f *Filters) localMatch(ns *namespace.Namespace, eventType logical.EventType) bool { - return f.clusterMatch(f.self, ns, eventType) + return f.clusterNodeMatch(f.self, ns, eventType) } -// watch creates a notification channel that receives changes for the given cluster. -func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []FilterChange, context.CancelFunc, error) { - notify := f.getOrCreateNotify(cluster) +// watch creates a notification channel that receives changes for the given cluster node. +func (f *Filters) watch(ctx context.Context, clusterNode clusterNodeID) (<-chan []FilterChange, context.CancelFunc, error) { + notify := f.getOrCreateNotify(clusterNode) ctx, cancelFunc := context.WithCancel(ctx) doneCh := ctx.Done() ch := make(chan []FilterChange) @@ -279,7 +279,7 @@ func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []Filter } }() - sendToNotify := make(chan *ClusterFilter) + sendToNotify := make(chan *ClusterNodeFilter) // goroutine for polling the condition variable. // it's necessary to hold the lock the entire time to ensure there are no race conditions. @@ -298,7 +298,7 @@ func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []Filter return default: } - next := f.copyPatternWithLock(cluster) + next := f.copyPatternWithLock(clusterNode) senders.Add(1) // don't block to send since we hold the lock go func() { @@ -312,7 +312,7 @@ func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []Filter // calculate changes and forward to notification channel go func() { defer close(ch) - var current *ClusterFilter + var current *ClusterNodeFilter for { next, ok := <-sendToNotify if !ok { @@ -333,7 +333,7 @@ func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []Filter return ch, cancelFunc, nil } -// FilterChange represents a change to a cluster's filters. +// FilterChange represents a change to a cluster node's filters. type FilterChange struct { Operation int NamespacePatterns []string @@ -347,7 +347,7 @@ const ( ) // calculateChanges calculates a set of changes necessary to transform from into to. -func calculateChanges(from *ClusterFilter, to *ClusterFilter) []FilterChange { +func calculateChanges(from *ClusterNodeFilter, to *ClusterNodeFilter) []FilterChange { var changes []FilterChange if to == nil { changes = append(changes, FilterChange{ diff --git a/vault/eventbus/filter_test.go b/vault/eventbus/filter_test.go index 4fb661f557..cc1790f654 100644 --- a/vault/eventbus/filter_test.go +++ b/vault/eventbus/filter_test.go @@ -45,7 +45,8 @@ func TestFilters_AddRemoveMatchLocal(t *testing.T) { assert.False(t, f.anyMatch(ns, "abc")) } -// TestFilters_Watch checks that adding a watch for a cluster will send a notification when the patterns are modified. +// TestFilters_Watch checks that adding a watch for a cluster node will send a +// notification when the patterns are modified. func TestFilters_Watch(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(cancelFunc) @@ -135,7 +136,7 @@ func TestFilters_AddRemoveClear(t *testing.T) { f.removePattern("somecluster", []string{"ns1"}, "abc") assert.Equal(t, "", f.filters["somecluster"].String()) f.addPattern("somecluster", []string{"ns1"}, "abc") - f.clearClusterPatterns("somecluster") + f.clearClusterNodePatterns("somecluster") assert.NotContains(t, f.filters, "somecluster") f.addGlobalPattern([]string{"ns1"}, "abc")