mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 09:42:25 +00:00
VAULT-17772: bump go-eventlogger to v0.2.1 (#21623)
* go-eventlogger: moved to v0.2.1, allows removal of pipeline and nodes
This commit is contained in:
3
changelog/21623.txt
Normal file
3
changelog/21623.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:improvement
|
||||
eventbus: updated go-eventlogger library to allow removal of nodes referenced by pipelines (used for subscriptions)
|
||||
```
|
||||
2
go.mod
2
go.mod
@@ -74,7 +74,7 @@ require (
|
||||
github.com/hashicorp/consul-template v0.32.0
|
||||
github.com/hashicorp/consul/api v1.20.0
|
||||
github.com/hashicorp/errwrap v1.1.0
|
||||
github.com/hashicorp/eventlogger v0.1.1
|
||||
github.com/hashicorp/eventlogger v0.2.1
|
||||
github.com/hashicorp/go-cleanhttp v0.5.2
|
||||
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
|
||||
github.com/hashicorp/go-gcp-common v0.8.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -1713,8 +1713,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo=
|
||||
github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
|
||||
github.com/hashicorp/eventlogger v0.2.1 h1:sjAOKO62BDDBn10516Uo7QDf5KEqzhU0LkUnbBptVUU=
|
||||
github.com/hashicorp/eventlogger v0.2.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
|
||||
|
||||
@@ -6,6 +6,7 @@ package eventbus
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -146,7 +147,10 @@ func init() {
|
||||
}
|
||||
|
||||
func NewEventBus(logger hclog.Logger) (*EventBus, error) {
|
||||
broker := eventlogger.NewBroker()
|
||||
broker, err := eventlogger.NewBroker()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
formatterID, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
@@ -218,7 +222,8 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
|
||||
// add info needed to cancel the subscription
|
||||
asyncNode.pipelineID = eventlogger.PipelineID(pipelineID)
|
||||
asyncNode.cancelFunc = cancel
|
||||
return asyncNode.ch, asyncNode.Close, nil
|
||||
// Capture context in a closure for the cancel func
|
||||
return asyncNode.ch, func() { asyncNode.Close(ctx) }, nil
|
||||
}
|
||||
|
||||
// SetSendTimeout sets the timeout of sending events. If the events are not accepted by the
|
||||
@@ -257,13 +262,19 @@ func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
|
||||
}
|
||||
|
||||
// Close tells the bus to stop sending us events.
|
||||
func (node *asyncChanNode) Close() {
|
||||
func (node *asyncChanNode) Close(ctx context.Context) {
|
||||
node.closeOnce.Do(func() {
|
||||
defer node.cancelFunc()
|
||||
if node.broker != nil {
|
||||
err := node.broker.RemovePipeline(eventTypeAll, node.pipelineID)
|
||||
if err != nil {
|
||||
node.logger.Warn("Error removing pipeline for closing node", "error", err)
|
||||
isPipelineRemoved, err := node.broker.RemovePipelineAndNodes(ctx, eventTypeAll, node.pipelineID)
|
||||
|
||||
switch {
|
||||
case err != nil && isPipelineRemoved:
|
||||
msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID)
|
||||
node.logger.Warn(msg, err)
|
||||
case err != nil:
|
||||
msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID)
|
||||
node.logger.Warn(msg, err)
|
||||
}
|
||||
}
|
||||
addSubscriptions(-1)
|
||||
@@ -283,7 +294,7 @@ func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*
|
||||
}
|
||||
if timeout {
|
||||
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
|
||||
node.Close()
|
||||
node.Close(ctx)
|
||||
}
|
||||
}()
|
||||
return e, nil
|
||||
|
||||
Reference in New Issue
Block a user