backport of commit 60f6c40202 (#23537)

Co-authored-by: Christopher Swenson <christopher.swenson@hashicorp.com>
This commit is contained in:
hc-github-team-secure-vault-core
2023-10-05 17:21:56 -04:00
committed by GitHub
parent a58a6f19ea
commit 20efd6df84
3 changed files with 49 additions and 2 deletions

3
changelog/23500.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
events: Ignore sending context to give more time for events to send
```

View File

@@ -114,7 +114,9 @@ func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo
// This function is meant to be used by trusted internal code, so it can specify details like the namespace
// and plugin info. Events from plugins should be routed through WithPlugin(), which will populate
// the namespace and plugin info automatically.
func (bus *EventBus) SendEventInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
// The context passed in is currently ignored to ensure that the event is sent if the context is short-lived,
// such as with an HTTP request context.
func (bus *EventBus) SendEventInternal(_ context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
if ns == nil {
return namespace.ErrNoNamespace
}
@@ -130,7 +132,7 @@ func (bus *EventBus) SendEventInternal(ctx context.Context, ns *namespace.Namesp
// We can't easily know when the SendEvent is complete, so we can't call the cancel function.
// But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long.
ctx, _ = context.WithTimeout(ctx, bus.timeout)
ctx, _ := context.WithTimeout(context.Background(), bus.timeout)
_, err := bus.broker.Send(ctx, eventTypeAll, eventReceived)
if err != nil {
// if no listeners for this event type are registered, that's okay, the event
@@ -155,6 +157,7 @@ func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logica
// SendEvent sends an event to the event bus and routes it to all relevant subscribers.
// This function does *not* wait for all subscribers to acknowledge before returning.
// The context passed in is currently ignored.
func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.EventType, data *logical.EventData) error {
return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
}

View File

@@ -75,6 +75,47 @@ func TestBusBasics(t *testing.T) {
}
}
// TestBusIgnoresSendContext tests that the context is ignored when sending to an event,
// so that we do not give up too quickly.
func TestBusIgnoresSendContext(t *testing.T) {
bus, err := NewEventBus(nil)
if err != nil {
t.Fatal(err)
}
eventType := logical.EventType("someType")
event, err := logical.NewEvent()
if err != nil {
t.Fatal(err)
}
bus.Start()
ch, subCancel, err := bus.Subscribe(context.Background(), namespace.RootNamespace, string(eventType), "")
if err != nil {
t.Fatal(err)
}
defer subCancel()
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != nil {
t.Errorf("Expected no error sending: %v", err)
}
timeout := time.After(1 * time.Second)
select {
case message := <-ch:
if message.Payload.(*logical.EventReceived).Event.Id != event.Id {
t.Errorf("Got unexpected message: %+v", message)
}
case <-timeout:
t.Error("Timeout waiting for message")
}
}
// TestSubscribeNonRootNamespace verifies that events for non-root namespaces
// aren't filtered out by the bus.
func TestSubscribeNonRootNamespace(t *testing.T) {