mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-30 02:02:43 +00:00
Add basic event bus broker stub (#18640)
Creates a new `eventbus` package under `vault` with an implementation of the `go-eventlogger` broker. Also creates a stub of a common broker that will be accessible in the core, and creates a simple event sending interface.
This commit is contained in:
committed by
GitHub
parent
11f6aad2b2
commit
3f329fe2d4
1
go.mod
1
go.mod
@@ -64,6 +64,7 @@ require (
|
||||
github.com/hashicorp/consul-template v0.29.5
|
||||
github.com/hashicorp/consul/api v1.15.2
|
||||
github.com/hashicorp/errwrap v1.1.0
|
||||
github.com/hashicorp/eventlogger v0.1.0
|
||||
github.com/hashicorp/go-cleanhttp v0.5.2
|
||||
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
|
||||
github.com/hashicorp/go-gcp-common v0.8.0
|
||||
|
||||
7
go.sum
7
go.sum
@@ -745,6 +745,7 @@ github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
|
||||
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
|
||||
github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
|
||||
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
|
||||
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
|
||||
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
|
||||
@@ -968,6 +969,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/eventlogger v0.1.0 h1:S6xc4gZVzewuDUP4R4Ngko419h/CGDuV/b4ADL3XLik=
|
||||
github.com/hashicorp/eventlogger v0.1.0/go.mod h1:a3IXf1aEJfpCPzseTOrwKj4fVW/Qn3oEmpQeaIznzH0=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
|
||||
@@ -1665,6 +1668,7 @@ github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
|
||||
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/rogpeppe/go-internal v1.6.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
|
||||
github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
|
||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||
@@ -1895,6 +1899,7 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
|
||||
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
||||
@@ -2351,6 +2356,7 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f
|
||||
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20210101214203-2dba1e4ea05c/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
@@ -2696,6 +2702,7 @@ k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/l
|
||||
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
|
||||
layeh.com/radius v0.0.0-20190322222518-890bc1058917 h1:BDXFaFzUt5EIqe/4wrTc4AcYZWP6iC6Ult+jQWLh5eU=
|
||||
layeh.com/radius v0.0.0-20190322222518-890bc1058917/go.mod h1:fywZKyu//X7iRzaxLgPWsvc0L26IUpVvE/aeIL2JtIQ=
|
||||
mvdan.cc/gofumpt v0.1.1/go.mod h1:yXG1r1WqZVKWbVRtBWKWX9+CxGYfA51nSomhM0woR48=
|
||||
mvdan.cc/gofumpt v0.3.1 h1:avhhrOmv0IuvQVK7fvwV91oFSGAk5/6Po8GXTzICeu8=
|
||||
mvdan.cc/gofumpt v0.3.1/go.mod h1:w3ymliuxvzVx8DAutBnVyDqYb1Niy/yCJt/lk821YCE=
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
|
||||
11
sdk/logical/events.go
Normal file
11
sdk/logical/events.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package logical
|
||||
|
||||
import "context"
|
||||
|
||||
// EventType represents a topic, and is a wrapper around eventlogger.EventType.
|
||||
type EventType string
|
||||
|
||||
// EventSender sends events to the common event bus.
|
||||
type EventSender interface {
|
||||
Send(ctx context.Context, eventType EventType, event any) error
|
||||
}
|
||||
@@ -53,6 +53,7 @@ import (
|
||||
sr "github.com/hashicorp/vault/serviceregistration"
|
||||
"github.com/hashicorp/vault/shamir"
|
||||
"github.com/hashicorp/vault/vault/cluster"
|
||||
"github.com/hashicorp/vault/vault/eventbus"
|
||||
"github.com/hashicorp/vault/vault/quotas"
|
||||
vaultseal "github.com/hashicorp/vault/vault/seal"
|
||||
"github.com/hashicorp/vault/version"
|
||||
@@ -677,6 +678,8 @@ type Core struct {
|
||||
|
||||
pendingRemovalMountsAllowed bool
|
||||
expirationRevokeRetryBase time.Duration
|
||||
|
||||
events *eventbus.EventBus
|
||||
}
|
||||
|
||||
func (c *Core) HAState() consts.HAState {
|
||||
@@ -3935,3 +3938,7 @@ func (c *Core) GetRaftAutopilotState(ctx context.Context) (*raft.AutopilotState,
|
||||
|
||||
return raftBackend.GetAutopilotServerState(ctx)
|
||||
}
|
||||
|
||||
func (c *Core) Events() *eventbus.EventBus {
|
||||
return c.events
|
||||
}
|
||||
|
||||
163
vault/eventbus/bus.go
Normal file
163
vault/eventbus/bus.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/hashicorp/eventlogger"
|
||||
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
)
|
||||
|
||||
var ErrNotStarted = errors.New("event broker has not been started")
|
||||
|
||||
var cloudEventsFormatterFilter *cloudevents.FormatterFilter
|
||||
|
||||
// EventBus contains the main logic of running an event broker for Vault.
|
||||
// Start() must be called before the EventBus will accept events for sending.
|
||||
type EventBus struct {
|
||||
logger hclog.Logger
|
||||
broker *eventlogger.Broker
|
||||
started atomic.Bool
|
||||
formatterNodeID eventlogger.NodeID
|
||||
}
|
||||
|
||||
type asyncChanNode struct {
|
||||
// TODO: add bounded deque buffer of *any
|
||||
ch chan any
|
||||
}
|
||||
|
||||
var _ eventlogger.Node = &asyncChanNode{}
|
||||
|
||||
func init() {
|
||||
// TODO: maybe this should relate to the Vault core somehow?
|
||||
sourceUrl, err := url.Parse("https://vaultproject.io/")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cloudEventsFormatterFilter = &cloudevents.FormatterFilter{
|
||||
Source: sourceUrl,
|
||||
Predicate: func(_ context.Context, e interface{}) (bool, error) {
|
||||
return true, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func NewEventBus() (*EventBus, error) {
|
||||
broker := eventlogger.NewBroker()
|
||||
|
||||
formatterID, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
formatterNodeID := eventlogger.NodeID(formatterID)
|
||||
err = broker.RegisterNode(formatterNodeID, cloudEventsFormatterFilter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &EventBus{
|
||||
logger: hclog.Default().Named("eventbus"),
|
||||
broker: broker,
|
||||
formatterNodeID: formatterNodeID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start starts the event bus, allowing events to be written.
|
||||
// It is not possible to stop or restart the event bus.
|
||||
// It is safe to call Start() multiple times.
|
||||
func (bus *EventBus) Start() {
|
||||
bus.started.Store(true)
|
||||
}
|
||||
|
||||
var _ logical.EventSender = (*EventBus)(nil)
|
||||
|
||||
// Send sends an event to the event bus and routes it to all relevant subscribers.
|
||||
// This function does *not* wait for all subscribers to acknowledge before returning.
|
||||
// TODO: use schema once it is defined
|
||||
func (bus *EventBus) Send(ctx context.Context, eventType logical.EventType, s any) error {
|
||||
if !bus.started.Load() {
|
||||
return ErrNotStarted
|
||||
}
|
||||
bus.logger.Info("Sending event", "event", s)
|
||||
_, err := bus.broker.Send(ctx, eventlogger.EventType(eventType), s)
|
||||
if err != nil {
|
||||
// if no listeners for this event type are registered, that's okay, the event
|
||||
// will just not be sent anywhere
|
||||
if strings.Contains(strings.ToLower(err.Error()), "no graph for eventtype") {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (bus *EventBus) Subscribe(_ context.Context, eventType logical.EventType) (chan any, error) {
|
||||
// subscriptions are still stored even if the bus has not been started
|
||||
pipelineID, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeID, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: should we have just one node, and handle all the routing ourselves?
|
||||
asyncNode := newAsyncNode()
|
||||
err = bus.broker.RegisterNode(eventlogger.NodeID(nodeID), asyncNode)
|
||||
if err != nil {
|
||||
defer asyncNode.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := []eventlogger.NodeID{bus.formatterNodeID, eventlogger.NodeID(nodeID)}
|
||||
|
||||
pipeline := eventlogger.Pipeline{
|
||||
PipelineID: eventlogger.PipelineID(pipelineID),
|
||||
EventType: eventlogger.EventType(eventType),
|
||||
NodeIDs: nodes,
|
||||
}
|
||||
err = bus.broker.RegisterPipeline(pipeline)
|
||||
if err != nil {
|
||||
defer asyncNode.Close()
|
||||
return nil, err
|
||||
}
|
||||
return asyncNode.ch, nil
|
||||
}
|
||||
|
||||
func newAsyncNode() *asyncChanNode {
|
||||
return &asyncChanNode{
|
||||
ch: make(chan any),
|
||||
}
|
||||
}
|
||||
|
||||
func (node *asyncChanNode) Close() error {
|
||||
close(node.ch)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
|
||||
// TODO: add timeout on sending to node.ch
|
||||
// sends to the channel async in another goroutine
|
||||
go func() {
|
||||
select {
|
||||
case node.ch <- e.Payload:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func (node *asyncChanNode) Reopen() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (node *asyncChanNode) Type() eventlogger.NodeType {
|
||||
return eventlogger.NodeTypeSink
|
||||
}
|
||||
100
vault/eventbus/bus_test.go
Normal file
100
vault/eventbus/bus_test.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
)
|
||||
|
||||
func TestBusBasics(t *testing.T) {
|
||||
bus, err := NewEventBus()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
eventType := logical.EventType("someType")
|
||||
|
||||
err = bus.Send(ctx, eventType, "message")
|
||||
if err != ErrNotStarted {
|
||||
t.Errorf("Expected not started error but got: %v", err)
|
||||
}
|
||||
|
||||
bus.Start()
|
||||
|
||||
err = bus.Send(ctx, eventType, "sent but never received")
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error sending: %v", err)
|
||||
}
|
||||
|
||||
ch, err := bus.Subscribe(ctx, eventType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = bus.Send(ctx, eventType, "message2")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
timeout := time.After(1 * time.Second)
|
||||
select {
|
||||
case message := <-ch:
|
||||
if message != "message2" {
|
||||
t.Errorf("Got unexpected message: %v", message)
|
||||
}
|
||||
case <-timeout:
|
||||
t.Error("Timeout waiting for message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBus2Subscriptions(t *testing.T) {
|
||||
bus, err := NewEventBus()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
eventType1 := logical.EventType("someType1")
|
||||
eventType2 := logical.EventType("someType2")
|
||||
bus.Start()
|
||||
|
||||
ch1, err := bus.Subscribe(ctx, eventType1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ch2, err := bus.Subscribe(ctx, eventType2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = bus.Send(ctx, eventType2, "message2")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err = bus.Send(ctx, eventType1, "message1")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
timeout := time.After(1 * time.Second)
|
||||
select {
|
||||
case message := <-ch1:
|
||||
if message != "message1" {
|
||||
t.Errorf("Got unexpected message: %v", message)
|
||||
}
|
||||
case <-timeout:
|
||||
t.Error("Timeout waiting for message1")
|
||||
}
|
||||
select {
|
||||
case message := <-ch2:
|
||||
if message != "message2" {
|
||||
t.Errorf("Got unexpected message: %v", message)
|
||||
}
|
||||
case <-timeout:
|
||||
t.Error("Timeout waiting for message2")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user