CE changes for VAULT-33452 (#29618)

This commit is contained in:
Theron Voran
2025-02-13 13:41:20 -08:00
committed by GitHub
parent 4b05b590f5
commit 86d23d614f
5 changed files with 91 additions and 83 deletions

3
changelog/29618.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
events (enterprise): Send events downstream to a performance standby node only when there is a subscriber on the standby node with a filter matching the events.
```

View File

@@ -220,10 +220,10 @@ func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespaceP
return bus.subscribeInternal(ctx, namespacePathPatterns, pattern, bexprFilter, nil)
}
// subscribeInternal creates the pipeline and connects it to the event bus to receive events.
// if the cluster is specified, then the namespacePathPatterns, pattern, and bexprFilter are ignored, and instead this
// subscription will be tied to the given cluster's filter.
func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string, cluster *string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscribeInternal creates the pipeline and connects it to the event bus to receive events. If the
// clusterNode is specified, then the namespacePathPatterns, pattern, and bexprFilter are ignored,
// and instead this subscription will be tied to the given cluster node's filter.
func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPatterns []string, pattern string, bexprFilter string, clusterNode *string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
@@ -241,8 +241,8 @@ func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPattern
}
var filterNode *eventlogger.Filter
if cluster != nil {
filterNode, err = newClusterFilterNode(bus.filters, clusterID(*cluster))
if clusterNode != nil {
filterNode, err = newClusterNodeFilterNode(bus.filters, clusterNodeID(*clusterNode))
if err != nil {
return nil, nil, err
}
@@ -265,7 +265,7 @@ func (bus *EventBus) subscribeInternal(ctx context.Context, namespacePathPattern
ctx, cancel := context.WithCancel(ctx)
asyncNode := newAsyncNode(ctx, bus.logger, bus.broker, func() {
if cluster == nil {
if clusterNode == nil {
bus.filters.removePattern(bus.filters.self, namespacePathPatterns, pattern)
}
})
@@ -307,9 +307,9 @@ func (bus *EventBus) GlobalMatch(ns *namespace.Namespace, eventType logical.Even
return bus.filters.globalMatch(ns, eventType)
}
// ApplyClusterFilterChanges applies the given filter changes to the cluster's filters.
func (bus *EventBus) ApplyClusterFilterChanges(c string, changes []FilterChange) {
bus.filters.applyChanges(clusterID(c), changes)
// ApplyClusterNodeFilterChanges applies the given filter changes to the cluster node's filters.
func (bus *EventBus) ApplyClusterNodeFilterChanges(c string, changes []FilterChange) {
bus.filters.applyChanges(clusterNodeID(c), changes)
}
// ApplyGlobalFilterChanges applies the given filter changes to the global filters.
@@ -322,9 +322,9 @@ func (bus *EventBus) ClearGlobalFilter() {
bus.filters.clearGlobalPatterns()
}
// ClearClusterFilter removes all entries from the given cluster's filter.
func (bus *EventBus) ClearClusterFilter(id string) {
bus.filters.clearClusterPatterns(clusterID(id))
// ClearClusterNodeFilter removes all entries from the given cluster node's filter.
func (bus *EventBus) ClearClusterNodeFilter(id string) {
bus.filters.clearClusterNodePatterns(clusterNodeID(id))
}
// NotifyOnGlobalFilterChanges returns a channel that receives changes to the global filter.
@@ -332,14 +332,14 @@ func (bus *EventBus) NotifyOnGlobalFilterChanges(ctx context.Context) (<-chan []
return bus.filters.watch(ctx, globalCluster)
}
// NotifyOnLocalFilterChanges returns a channel that receives changes to the filter for the current cluster.
// NotifyOnLocalFilterChanges returns a channel that receives changes to the filter for the current cluster node.
func (bus *EventBus) NotifyOnLocalFilterChanges(ctx context.Context) (<-chan []FilterChange, context.CancelFunc, error) {
return bus.NotifyOnClusterFilterChanges(ctx, string(bus.filters.self))
return bus.NotifyOnClusterNodeFilterChanges(ctx, string(bus.filters.self))
}
// NotifyOnClusterFilterChanges returns a channel that receives changes to the filter for the given cluster.
func (bus *EventBus) NotifyOnClusterFilterChanges(ctx context.Context, cluster string) (<-chan []FilterChange, context.CancelFunc, error) {
return bus.filters.watch(ctx, clusterID(cluster))
// NotifyOnClusterNodeFilterChanges returns a channel that receives changes to the filter for the given cluster node.
func (bus *EventBus) NotifyOnClusterNodeFilterChanges(ctx context.Context, clusterNode string) (<-chan []FilterChange, context.CancelFunc, error) {
return bus.filters.watch(ctx, clusterNodeID(clusterNode))
}
// NewAllEventsSubscription creates a new subscription to all events.
@@ -353,18 +353,18 @@ func (bus *EventBus) NewGlobalSubscription(ctx context.Context) (<-chan *eventlo
return bus.subscribeInternal(ctx, nil, "", "", &g)
}
// NewClusterSubscription creates a new subscription to all events that match the given cluster's filter.
func (bus *EventBus) NewClusterSubscription(ctx context.Context, cluster string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.subscribeInternal(ctx, nil, "", "", &cluster)
// NewClusterNodeSubscription creates a new subscription to all events that match the given cluster node's filter.
func (bus *EventBus) NewClusterNodeSubscription(ctx context.Context, clusterNode string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.subscribeInternal(ctx, nil, "", "", &clusterNode)
}
// creates a new filter node that is tied to the filter for a given cluster
func newClusterFilterNode(filters *Filters, c clusterID) (*eventlogger.Filter, error) {
// creates a new filter node that is tied to the filter for a given cluster node
func newClusterNodeFilterNode(filters *Filters, c clusterNodeID) (*eventlogger.Filter, error) {
return &eventlogger.Filter{
Predicate: func(e *eventlogger.Event) (bool, error) {
eventRecv := e.Payload.(*logical.EventReceived)
eventNs := strings.Trim(eventRecv.Namespace, "/")
if filters.clusterMatch(c, &namespace.Namespace{
if filters.clusterNodeMatch(c, &namespace.Namespace{
Path: eventNs,
}, logical.EventType(eventRecv.EventType)) {
return true, nil

View File

@@ -818,8 +818,9 @@ func TestSubscribeGlobal_WithApply(t *testing.T) {
}
}
// TestSubscribeCluster tests that the cluster filter subscription mechanism works.
func TestSubscribeCluster(t *testing.T) {
// TestSubscribeClusterNode tests that the cluster node filter subscription
// mechanism works.
func TestSubscribeClusterNode(t *testing.T) {
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
@@ -830,7 +831,7 @@ func TestSubscribeCluster(t *testing.T) {
bus.filters.addPattern("somecluster", []string{""}, "abc*")
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
ch, cancel2, err := bus.NewClusterSubscription(ctx, "somecluster")
ch, cancel2, err := bus.NewClusterNodeSubscription(ctx, "somecluster")
if err != nil {
t.Fatal(err)
}
@@ -854,8 +855,9 @@ func TestSubscribeCluster(t *testing.T) {
}
}
// TestSubscribeCluster_WithApply tests that the cluster filter subscription mechanism works when using ApplyClusterFilterChanges.
func TestSubscribeCluster_WithApply(t *testing.T) {
// TestSubscribeClusterNode_WithApply tests that the cluster node filter
// subscription mechanism works when using ApplyClusterNodeFilterChanges.
func TestSubscribeClusterNode_WithApply(t *testing.T) {
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
@@ -865,14 +867,14 @@ func TestSubscribeCluster_WithApply(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
bus.ApplyClusterFilterChanges("somecluster", []FilterChange{
bus.ApplyClusterNodeFilterChanges("somecluster", []FilterChange{
{
Operation: FilterChangeAdd,
NamespacePatterns: []string{""},
EventTypePattern: "abc*",
},
})
ch, cancel2, err := bus.NewClusterSubscription(ctx, "somecluster")
ch, cancel2, err := bus.NewClusterNodeSubscription(ctx, "somecluster")
if err != nil {
t.Fatal(err)
}
@@ -937,8 +939,9 @@ func TestClearGlobalFilter(t *testing.T) {
}
}
// TestClearClusterFilter tests that clearing a cluster filter means no messages get through.
func TestClearClusterFilter(t *testing.T) {
// TestClearClusterNodeFilter tests that clearing a cluster node filter means no
// messages get through.
func TestClearClusterNodeFilter(t *testing.T) {
bus, err := NewEventBus("", nil)
if err != nil {
t.Fatal(err)
@@ -948,15 +951,15 @@ func TestClearClusterFilter(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
bus.ApplyClusterFilterChanges("somecluster", []FilterChange{
bus.ApplyClusterNodeFilterChanges("somecluster", []FilterChange{
{
Operation: FilterChangeAdd,
NamespacePatterns: []string{""},
EventTypePattern: "abc*",
},
})
bus.ClearClusterFilter("somecluster")
ch, cancel2, err := bus.NewClusterSubscription(ctx, "somecluster")
bus.ClearClusterNodeFilter("somecluster")
ch, cancel2, err := bus.NewClusterNodeSubscription(ctx, "somecluster")
if err != nil {
t.Fatal(err)
}
@@ -1020,7 +1023,8 @@ func TestNotifyOnGlobalFilterChanges(t *testing.T) {
}
}
// TestNotifyOnLocalFilterChanges tests that notifications on local cluster filter changes are sent.
// TestNotifyOnLocalFilterChanges tests that notifications on local cluster node
// filter changes are sent.
func TestNotifyOnLocalFilterChanges(t *testing.T) {
bus, err := NewEventBus("somecluster", nil)
if err != nil {
@@ -1037,7 +1041,7 @@ func TestNotifyOnLocalFilterChanges(t *testing.T) {
t.Fatal(err)
}
t.Cleanup(cancel2)
bus.ApplyClusterFilterChanges("somecluster", []FilterChange{
bus.ApplyClusterNodeFilterChanges("somecluster", []FilterChange{
{
Operation: FilterChangeAdd,
NamespacePatterns: []string{""},

View File

@@ -19,18 +19,18 @@ import (
const globalCluster = ""
// Filters keeps track of all the event patterns that each cluster is interested in.
// Filters keeps track of all the event patterns that each cluster node is interested in.
type Filters struct {
lock sync.RWMutex
self clusterID
filters map[clusterID]*ClusterFilter
self clusterNodeID
filters map[clusterNodeID]*ClusterNodeFilter
// notifyChanges is used to notify about changes to filters. The condition variables are tied to single lock above.
notifyChanges map[clusterID]*sync.Cond
notifyChanges map[clusterNodeID]*sync.Cond
}
// clusterID is used to syntactically indicate that the string is a cluster's identifier.
type clusterID string
// clusterNodeID is used to syntactically indicate that the string is a cluster nodes's identifier.
type clusterNodeID string
// pattern is used to represent one or more combinations of patterns
type pattern struct {
@@ -46,14 +46,14 @@ func (p pattern) isEmpty() bool {
return p.namespacePatterns == "" && p.eventTypePattern == ""
}
// ClusterFilter keeps track of all patterns that a particular cluster is interested in.
type ClusterFilter struct {
// ClusterNodeFilter keeps track of all patterns that a particular cluster node is interested in.
type ClusterNodeFilter struct {
patterns sets.Set[pattern]
}
// match checks if the given ns and eventType matches any pattern in the cluster's filter.
// match checks if the given ns and eventType matches any pattern in the cluster node's filter.
// Must be called while holding a (read) lock for the filter.
func (nf *ClusterFilter) match(ns *namespace.Namespace, eventType logical.EventType) bool {
func (nf *ClusterNodeFilter) match(ns *namespace.Namespace, eventType logical.EventType) bool {
if nf == nil {
return false
}
@@ -69,14 +69,14 @@ func (nf *ClusterFilter) match(ns *namespace.Namespace, eventType logical.EventT
return false
}
// NewFilters creates an empty set of filters to keep track of each cluster's pattern interests.
// NewFilters creates an empty set of filters to keep track of each cluster node's pattern interests.
func NewFilters(self string) *Filters {
f := &Filters{
self: clusterID(self),
filters: map[clusterID]*ClusterFilter{},
notifyChanges: map[clusterID]*sync.Cond{},
self: clusterNodeID(self),
filters: map[clusterNodeID]*ClusterNodeFilter{},
notifyChanges: map[clusterNodeID]*sync.Cond{},
}
f.notifyChanges[clusterID(self)] = sync.NewCond(&f.lock)
f.notifyChanges[clusterNodeID(self)] = sync.NewCond(&f.lock)
f.notifyChanges[globalCluster] = sync.NewCond(&f.lock)
return f
}
@@ -89,7 +89,7 @@ func (f *Filters) String() string {
return x
}
func (nf *ClusterFilter) String() string {
func (nf *ClusterNodeFilter) String() string {
var x []string
l := nf.patterns.UnsortedList()
for _, v := range l {
@@ -113,7 +113,7 @@ func (f *Filters) clearGlobalPatterns() {
delete(f.filters, globalCluster)
}
func (f *Filters) getOrCreateNotify(c clusterID) *sync.Cond {
func (f *Filters) getOrCreateNotify(c clusterNodeID) *sync.Cond {
// fast check when we don't need to create the Cond
f.lock.RLock()
n, ok := f.notifyChanges[c]
@@ -133,7 +133,7 @@ func (f *Filters) getOrCreateNotify(c clusterID) *sync.Cond {
return n
}
func (f *Filters) notify(c clusterID) {
func (f *Filters) notify(c clusterNodeID) {
f.lock.RLock()
defer f.lock.RUnlock()
if notifier, ok := f.notifyChanges[c]; ok {
@@ -141,16 +141,16 @@ func (f *Filters) notify(c clusterID) {
}
}
func (f *Filters) clearClusterPatterns(c clusterID) {
func (f *Filters) clearClusterNodePatterns(c clusterNodeID) {
defer f.notify(c)
f.lock.Lock()
defer f.lock.Unlock()
delete(f.filters, c)
}
// copyPatternWithLock gets a copy of a cluster's filters
func (f *Filters) copyPatternWithLock(c clusterID) *ClusterFilter {
filters := &ClusterFilter{}
// copyPatternWithLock gets a copy of a cluster node's filters
func (f *Filters) copyPatternWithLock(c clusterNodeID) *ClusterNodeFilter {
filters := &ClusterNodeFilter{}
if got, ok := f.filters[c]; ok {
filters.patterns = got.patterns.Clone()
} else {
@@ -160,7 +160,7 @@ func (f *Filters) copyPatternWithLock(c clusterID) *ClusterFilter {
}
// applyChanges applies the changes in the given list, atomically.
func (f *Filters) applyChanges(c clusterID, changes []FilterChange) {
func (f *Filters) applyChanges(c clusterNodeID, changes []FilterChange) {
defer f.notify(c)
f.lock.Lock()
defer f.lock.Unlock()
@@ -173,7 +173,7 @@ func (f *Filters) applyChanges(c clusterID, changes []FilterChange) {
for _, change := range changes {
applyChange(newPatterns, &change)
}
f.filters[c] = &ClusterFilter{patterns: newPatterns}
f.filters[c] = &ClusterNodeFilter{patterns: newPatterns}
}
// applyChange applies a single filter change to the given set.
@@ -204,13 +204,13 @@ func cleanJoinNamespaces(nsPatterns []string) string {
return strings.Join(trimmed, " ")
}
// addPattern adds a pattern to a node's list.
func (f *Filters) addPattern(c clusterID, namespacePatterns []string, eventTypePattern string) {
// addPattern adds a pattern to a cluster node's list.
func (f *Filters) addPattern(c clusterNodeID, namespacePatterns []string, eventTypePattern string) {
defer f.notify(c)
f.lock.Lock()
defer f.lock.Unlock()
if _, ok := f.filters[c]; !ok {
f.filters[c] = &ClusterFilter{
f.filters[c] = &ClusterNodeFilter{
patterns: sets.New[pattern](),
}
}
@@ -220,8 +220,8 @@ func (f *Filters) addPattern(c clusterID, namespacePatterns []string, eventTypeP
f.filters[c].patterns.Insert(p)
}
// removePattern removes a pattern from a cluster's list.
func (f *Filters) removePattern(c clusterID, namespacePatterns []string, eventTypePattern string) {
// removePattern removes a pattern from a cluster node's list.
func (f *Filters) removePattern(c clusterNodeID, namespacePatterns []string, eventTypePattern string) {
defer f.notify(c)
nsPatterns := slices.Clone(namespacePatterns)
sort.Strings(nsPatterns)
@@ -235,7 +235,7 @@ func (f *Filters) removePattern(c clusterID, namespacePatterns []string, eventTy
filters.patterns.Delete(check)
}
// anyMatch returns true if any cluster's pattern list matches the arguments.
// anyMatch returns true if any cluster node's pattern list matches the arguments.
func (f *Filters) anyMatch(ns *namespace.Namespace, eventType logical.EventType) bool {
f.lock.RLock()
defer f.lock.RUnlock()
@@ -249,24 +249,24 @@ func (f *Filters) anyMatch(ns *namespace.Namespace, eventType logical.EventType)
// globalMatch returns true if the global cluster's pattern list matches the arguments.
func (f *Filters) globalMatch(ns *namespace.Namespace, eventType logical.EventType) bool {
return f.clusterMatch(globalCluster, ns, eventType)
return f.clusterNodeMatch(globalCluster, ns, eventType)
}
// clusterMatch returns true if the given cluster's pattern list matches the arguments.
func (f *Filters) clusterMatch(c clusterID, ns *namespace.Namespace, eventType logical.EventType) bool {
// clusterNodeMatch returns true if the given cluster node's pattern list matches the arguments.
func (f *Filters) clusterNodeMatch(c clusterNodeID, ns *namespace.Namespace, eventType logical.EventType) bool {
f.lock.RLock()
defer f.lock.RUnlock()
return f.filters[c].match(ns, eventType)
}
// localMatch returns true if the local cluster's pattern list matches the arguments.
// localMatch returns true if the local cluster node's pattern list matches the arguments.
func (f *Filters) localMatch(ns *namespace.Namespace, eventType logical.EventType) bool {
return f.clusterMatch(f.self, ns, eventType)
return f.clusterNodeMatch(f.self, ns, eventType)
}
// watch creates a notification channel that receives changes for the given cluster.
func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []FilterChange, context.CancelFunc, error) {
notify := f.getOrCreateNotify(cluster)
// watch creates a notification channel that receives changes for the given cluster node.
func (f *Filters) watch(ctx context.Context, clusterNode clusterNodeID) (<-chan []FilterChange, context.CancelFunc, error) {
notify := f.getOrCreateNotify(clusterNode)
ctx, cancelFunc := context.WithCancel(ctx)
doneCh := ctx.Done()
ch := make(chan []FilterChange)
@@ -279,7 +279,7 @@ func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []Filter
}
}()
sendToNotify := make(chan *ClusterFilter)
sendToNotify := make(chan *ClusterNodeFilter)
// goroutine for polling the condition variable.
// it's necessary to hold the lock the entire time to ensure there are no race conditions.
@@ -298,7 +298,7 @@ func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []Filter
return
default:
}
next := f.copyPatternWithLock(cluster)
next := f.copyPatternWithLock(clusterNode)
senders.Add(1)
// don't block to send since we hold the lock
go func() {
@@ -312,7 +312,7 @@ func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []Filter
// calculate changes and forward to notification channel
go func() {
defer close(ch)
var current *ClusterFilter
var current *ClusterNodeFilter
for {
next, ok := <-sendToNotify
if !ok {
@@ -333,7 +333,7 @@ func (f *Filters) watch(ctx context.Context, cluster clusterID) (<-chan []Filter
return ch, cancelFunc, nil
}
// FilterChange represents a change to a cluster's filters.
// FilterChange represents a change to a cluster node's filters.
type FilterChange struct {
Operation int
NamespacePatterns []string
@@ -347,7 +347,7 @@ const (
)
// calculateChanges calculates a set of changes necessary to transform from into to.
func calculateChanges(from *ClusterFilter, to *ClusterFilter) []FilterChange {
func calculateChanges(from *ClusterNodeFilter, to *ClusterNodeFilter) []FilterChange {
var changes []FilterChange
if to == nil {
changes = append(changes, FilterChange{

View File

@@ -45,7 +45,8 @@ func TestFilters_AddRemoveMatchLocal(t *testing.T) {
assert.False(t, f.anyMatch(ns, "abc"))
}
// TestFilters_Watch checks that adding a watch for a cluster will send a notification when the patterns are modified.
// TestFilters_Watch checks that adding a watch for a cluster node will send a
// notification when the patterns are modified.
func TestFilters_Watch(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
@@ -135,7 +136,7 @@ func TestFilters_AddRemoveClear(t *testing.T) {
f.removePattern("somecluster", []string{"ns1"}, "abc")
assert.Equal(t, "", f.filters["somecluster"].String())
f.addPattern("somecluster", []string{"ns1"}, "abc")
f.clearClusterPatterns("somecluster")
f.clearClusterNodePatterns("somecluster")
assert.NotContains(t, f.filters, "somecluster")
f.addGlobalPattern([]string{"ns1"}, "abc")