mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-30 02:02:43 +00:00
backport of commit 8bb9cbbeba (#21627)
Co-authored-by: Peter Wilson <peter.wilson@hashicorp.com>
This commit is contained in:
committed by
GitHub
parent
52f1c20ac6
commit
50c8e7f5c2
3
changelog/21623.txt
Normal file
3
changelog/21623.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
```release-note:improvement
|
||||||
|
eventbus: updated go-eventlogger library to allow removal of nodes referenced by pipelines (used for subscriptions)
|
||||||
|
```
|
||||||
2
go.mod
2
go.mod
@@ -73,7 +73,7 @@ require (
|
|||||||
github.com/hashicorp/consul-template v0.32.0
|
github.com/hashicorp/consul-template v0.32.0
|
||||||
github.com/hashicorp/consul/api v1.20.0
|
github.com/hashicorp/consul/api v1.20.0
|
||||||
github.com/hashicorp/errwrap v1.1.0
|
github.com/hashicorp/errwrap v1.1.0
|
||||||
github.com/hashicorp/eventlogger v0.1.1
|
github.com/hashicorp/eventlogger v0.2.1
|
||||||
github.com/hashicorp/go-cleanhttp v0.5.2
|
github.com/hashicorp/go-cleanhttp v0.5.2
|
||||||
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
|
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
|
||||||
github.com/hashicorp/go-gcp-common v0.8.0
|
github.com/hashicorp/go-gcp-common v0.8.0
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -1698,8 +1698,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK
|
|||||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||||
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||||
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||||
github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo=
|
github.com/hashicorp/eventlogger v0.2.1 h1:sjAOKO62BDDBn10516Uo7QDf5KEqzhU0LkUnbBptVUU=
|
||||||
github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
|
github.com/hashicorp/eventlogger v0.2.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
|
||||||
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||||
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||||
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
|
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ package eventbus
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -146,7 +147,10 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewEventBus(logger hclog.Logger) (*EventBus, error) {
|
func NewEventBus(logger hclog.Logger) (*EventBus, error) {
|
||||||
broker := eventlogger.NewBroker()
|
broker, err := eventlogger.NewBroker()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
formatterID, err := uuid.GenerateUUID()
|
formatterID, err := uuid.GenerateUUID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -218,7 +222,8 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
|
|||||||
// add info needed to cancel the subscription
|
// add info needed to cancel the subscription
|
||||||
asyncNode.pipelineID = eventlogger.PipelineID(pipelineID)
|
asyncNode.pipelineID = eventlogger.PipelineID(pipelineID)
|
||||||
asyncNode.cancelFunc = cancel
|
asyncNode.cancelFunc = cancel
|
||||||
return asyncNode.ch, asyncNode.Close, nil
|
// Capture context in a closure for the cancel func
|
||||||
|
return asyncNode.ch, func() { asyncNode.Close(ctx) }, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetSendTimeout sets the timeout of sending events. If the events are not accepted by the
|
// SetSendTimeout sets the timeout of sending events. If the events are not accepted by the
|
||||||
@@ -257,13 +262,19 @@ func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Close tells the bus to stop sending us events.
|
// Close tells the bus to stop sending us events.
|
||||||
func (node *asyncChanNode) Close() {
|
func (node *asyncChanNode) Close(ctx context.Context) {
|
||||||
node.closeOnce.Do(func() {
|
node.closeOnce.Do(func() {
|
||||||
defer node.cancelFunc()
|
defer node.cancelFunc()
|
||||||
if node.broker != nil {
|
if node.broker != nil {
|
||||||
err := node.broker.RemovePipeline(eventTypeAll, node.pipelineID)
|
isPipelineRemoved, err := node.broker.RemovePipelineAndNodes(ctx, eventTypeAll, node.pipelineID)
|
||||||
if err != nil {
|
|
||||||
node.logger.Warn("Error removing pipeline for closing node", "error", err)
|
switch {
|
||||||
|
case err != nil && isPipelineRemoved:
|
||||||
|
msg := fmt.Sprintf("Error removing nodes referenced by pipeline %q", node.pipelineID)
|
||||||
|
node.logger.Warn(msg, err)
|
||||||
|
case err != nil:
|
||||||
|
msg := fmt.Sprintf("Error removing pipeline %q", node.pipelineID)
|
||||||
|
node.logger.Warn(msg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
addSubscriptions(-1)
|
addSubscriptions(-1)
|
||||||
@@ -283,7 +294,7 @@ func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*
|
|||||||
}
|
}
|
||||||
if timeout {
|
if timeout {
|
||||||
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
|
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
|
||||||
node.Close()
|
node.Close(ctx)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return e, nil
|
return e, nil
|
||||||
|
|||||||
Reference in New Issue
Block a user