diff --git a/changelog/22540.txt b/changelog/22540.txt new file mode 100644 index 0000000000..191342bd29 --- /dev/null +++ b/changelog/22540.txt @@ -0,0 +1,3 @@ +```release-note:improvement +events: Allow subscriptions to multiple namespaces +``` diff --git a/command/events.go b/command/events.go index e918200913..684c40c5a5 100644 --- a/command/events.go +++ b/command/events.go @@ -23,6 +23,8 @@ var ( type EventsSubscribeCommands struct { *BaseCommand + + namespaces []string } func (c *EventsSubscribeCommands) Synopsis() string { @@ -31,10 +33,11 @@ func (c *EventsSubscribeCommands) Synopsis() string { func (c *EventsSubscribeCommands) Help() string { helpText := ` -Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType +Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType - Subscribe to events of the given event type (topic). The events will be - output to standard out. + Subscribe to events of the given event type (topic), which may be a glob + pattern (with "*"" treated as a wildcard). The events will be sent to + standard out. The output will be a JSON object serialized using the default protobuf JSON serialization format, with one line per event received. @@ -44,7 +47,19 @@ Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType func (c *EventsSubscribeCommands) Flags() *FlagSets { set := c.flagSet(FlagSetHTTP) - + f := set.NewFlagSet("Subscribe Options") + f.StringSliceVar(&StringSliceVar{ + Name: "namespaces", + Usage: `Specifies one or more patterns of additional child namespaces + to subscribe to. The namespace of the request is automatically + prepended, so specifying 'ns2' when the request is in the 'ns1' + namespace will result in subscribing to 'ns1/ns2', in addition to + 'ns1'. Patterns can include "*" characters to indicate + wildcards. The default is to subscribe only to the request's + namespace.`, + Default: []string{}, + Target: &c.namespaces, + }) return set } @@ -88,6 +103,22 @@ func (c *EventsSubscribeCommands) Run(args []string) int { return 0 } +// cleanNamespace removes leading and trailing space and /'s from the namespace path. +func cleanNamespace(ns string) string { + ns = strings.TrimSpace(ns) + ns = strings.Trim(ns, "/") + return ns +} + +// cleanNamespaces removes leading and trailing space and /'s from the namespace paths. +func cleanNamespaces(namespaces []string) []string { + cleaned := make([]string, len(namespaces)) + for i, ns := range namespaces { + cleaned[i] = cleanNamespace(ns) + } + return cleaned +} + func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path string) error { r := client.NewRequest("GET", "/v1/"+path) u := r.URL @@ -98,9 +129,12 @@ func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path stri } q := u.Query() q.Set("json", "true") + if len(c.namespaces) > 0 { + q["namespaces"] = cleanNamespaces(c.namespaces) + } u.RawQuery = q.Encode() client.AddHeader("X-Vault-Token", client.Token()) - client.AddHeader("X-Vault-Namesapce", client.Namespace()) + client.AddHeader("X-Vault-Namespace", client.Namespace()) ctx := context.Background() // Follow redirects in case our request if our request is forwarded to the leader. diff --git a/http/events.go b/http/events.go index 16bca15571..1958fdb6c1 100644 --- a/http/events.go +++ b/http/events.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net/http" + "path" "strconv" "strings" "time" @@ -22,13 +23,13 @@ import ( ) type eventSubscribeArgs struct { - ctx context.Context - logger hclog.Logger - events *eventbus.EventBus - ns *namespace.Namespace - pattern string - conn *websocket.Conn - json bool + ctx context.Context + logger hclog.Logger + events *eventbus.EventBus + namespacePatterns []string + pattern string + conn *websocket.Conn + json bool } // handleEventsSubscribeWebsocket runs forever, returning a websocket error code and reason @@ -36,7 +37,7 @@ type eventSubscribeArgs struct { func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCode, string, error) { ctx := args.ctx logger := args.logger - ch, cancel, err := args.events.Subscribe(ctx, args.ns, args.pattern) + ch, cancel, err := args.events.SubscribeMultipleNamespaces(ctx, args.namespacePatterns, args.pattern) if err != nil { logger.Info("Error subscribing", "error", err) return websocket.StatusUnsupportedData, "Error subscribing", nil @@ -123,6 +124,8 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler } } + namespacePatterns := r.URL.Query()["namespaces"] + namespacePatterns = prependNamespacePatterns(namespacePatterns, ns) conn, err := websocket.Accept(w, r, nil) if err != nil { logger.Info("Could not accept as websocket", "error", err) @@ -143,7 +146,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler } }() - closeStatus, closeReason, err := handleEventsSubscribeWebsocket(eventSubscribeArgs{ctx, logger, core.Events(), ns, pattern, conn, json}) + closeStatus, closeReason, err := handleEventsSubscribeWebsocket(eventSubscribeArgs{ctx, logger, core.Events(), namespacePatterns, pattern, conn, json}) if err != nil { closeStatus = websocket.CloseStatus(err) if closeStatus == -1 { @@ -163,3 +166,18 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler } }) } + +// prependNamespacePatterns prepends the request namespace to the namespace patterns, +// and also adds the request namespace to the list. +func prependNamespacePatterns(patterns []string, requestNamespace *namespace.Namespace) []string { + prepend := strings.Trim(requestNamespace.Path, "/") + newPatterns := make([]string, 0, len(patterns)+1) + newPatterns = append(newPatterns, prepend) + for _, pattern := range patterns { + if strings.Trim(strings.TrimSpace(pattern), "/") == "" { + continue + } + newPatterns = append(newPatterns, path.Join(prepend, pattern, "/")) + } + return newPatterns +} diff --git a/http/events_test.go b/http/events_test.go index c1d961b5d4..e666ade972 100644 --- a/http/events_test.go +++ b/http/events_test.go @@ -26,6 +26,7 @@ import ( "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault" "github.com/hashicorp/vault/vault/cluster" + "github.com/stretchr/testify/assert" "nhooyr.io/websocket" ) @@ -134,6 +135,161 @@ func TestEventsSubscribe(t *testing.T) { } } +// TestEventsSubscribeNamespaces tests the websocket endpoint for subscribing to events in multiple namespaces. +func TestEventsSubscribeNamespaces(t *testing.T) { + core := vault.TestCoreWithConfig(t, &vault.CoreConfig{ + Experiments: []string{experiments.VaultExperimentEventsAlpha1}, + }) + + ln, addr := TestServer(t, core) + defer ln.Close() + + // unseal the core + keys, token := vault.TestCoreInit(t, core) + for _, key := range keys { + _, err := core.Unseal(key) + if err != nil { + t.Fatal(err) + } + } + + stop := atomic.Bool{} + + const eventType = "abc" + + namespaces := []string{ + "", + "ns1", + "ns2", + "ns1/ns13", + "ns1/ns13/ns134", + } + + // send some events with the specified namespaces + sendEvents := func() { + pluginInfo := &logical.EventPluginInfo{ + MountPath: "secret", + } + for i := range namespaces { + var ns *namespace.Namespace + if namespaces[i] == "" { + ns = namespace.RootNamespace + } else { + ns = &namespace.Namespace{ + ID: namespaces[i], + Path: namespaces[i], + CustomMetadata: nil, + } + } + id, err := uuid.GenerateUUID() + if err != nil { + core.Logger().Info("Error generating UUID, exiting sender", "error", err) + } + err = core.Events().SendEventInternal(namespace.RootContext(context.Background()), ns, pluginInfo, eventType, &logical.EventData{ + Id: id, + Metadata: nil, + EntityIds: nil, + Note: "testing", + }) + if err != nil { + core.Logger().Info("Error sending event, exiting sender", "error", err) + } + } + } + + t.Cleanup(func() { + stop.Store(true) + }) + + ctx := context.Background() + wsAddr := strings.Replace(addr, "http", "ws", 1) + + testCases := []struct { + name string + namespaces []string + expectedEvents int + }{ + {"invalid", []string{"something"}, 1}, + {"simple wildcard", []string{"ns*"}, 5}, + {"two namespaces", []string{"ns1/ns13", "ns1/other"}, 2}, + {"no namespace", []string{""}, 1}, + {"all wildcard", []string{"*"}, 5}, + {"mixed wildcard", []string{"ns1/ns13*", "ns2"}, 4}, + {"overlapping wildcard", []string{"ns*", "ns1"}, 5}, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + extra := "" + for _, ns := range testCase.namespaces { + extra += "&namespaces=" + ns + } + url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?json=true%v", wsAddr, eventType, extra) + conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{ + HTTPHeader: http.Header{"x-vault-token": []string{token}}, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + conn.Close(websocket.StatusNormalClosure, "") + }) + sendEvents() + ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + gotEvents := 0 + for { + _, msg, err := conn.Read(ctx) + if err != nil { + break + } + event := map[string]interface{}{} + err = json.Unmarshal(msg, &event) + if err != nil { + t.Fatal(err) + } + t.Log(string(msg)) + gotEvents += 1 + } + assert.Equal(t, testCase.expectedEvents, gotEvents) + }) + } +} + +func TestNamespacePrepend(t *testing.T) { + testCases := []struct { + requestNs string + patterns []string + result []string + }{ + {"", []string{"ns*"}, []string{"", "ns*"}}, + {"ns1", []string{"ns*"}, []string{"ns1", "ns1/ns*"}}, + {"ns1", []string{"ns1*"}, []string{"ns1", "ns1/ns1*"}}, + {"ns1", []string{"ns1/*"}, []string{"ns1", "ns1/ns1/*"}}, + {"", []string{"ns1/ns13", "ns1/other"}, []string{"", "ns1/ns13", "ns1/other"}}, + {"ns1", []string{"ns1/ns13", "ns1/other"}, []string{"ns1", "ns1/ns1/ns13", "ns1/ns1/other"}}, + {"", []string{""}, []string{""}}, + {"", nil, []string{""}}, + {"ns1", []string{""}, []string{"ns1"}}, + {"ns1", []string{"", ""}, []string{"ns1"}}, + {"ns1", []string{"ns1"}, []string{"ns1", "ns1/ns1"}}, + {"", []string{"*"}, []string{"", "*"}}, + {"ns1", []string{"*"}, []string{"ns1", "ns1/*"}}, + {"", []string{"ns1/ns13*", "ns2"}, []string{"", "ns1/ns13*", "ns2"}}, + {"ns1", []string{"ns1/ns13*", "ns2"}, []string{"ns1", "ns1/ns1/ns13*", "ns1/ns2"}}, + {"", []string{"ns*", "ns1"}, []string{"", "ns*", "ns1"}}, + {"ns1", []string{"ns*", "ns1"}, []string{"ns1", "ns1/ns*", "ns1/ns1"}}, + {"ns1", []string{"ns1*", "ns1"}, []string{"ns1", "ns1/ns1*", "ns1/ns1"}}, + {"ns1", []string{"ns1/*", "ns1"}, []string{"ns1", "ns1/ns1/*", "ns1/ns1"}}, + } + for _, testCase := range testCases { + t.Run(testCase.requestNs+" "+strings.Join(testCase.patterns, " "), func(t *testing.T) { + result := prependNamespacePatterns(testCase.patterns, &namespace.Namespace{ID: testCase.requestNs, Path: testCase.requestNs}) + assert.Equal(t, testCase.result, result) + }) + } +} + func checkRequiredCloudEventsFields(t *testing.T, event map[string]interface{}) { t.Helper() for _, attr := range []string{"id", "source", "specversion", "type"} { diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index 75d4691cd4..87fb3a0589 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -202,6 +202,10 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) { } func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) { + return bus.SubscribeMultipleNamespaces(ctx, []string{ns.Path}, pattern) +} + +func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) { // subscriptions are still stored even if the bus has not been started pipelineID, err := uuid.GenerateUUID() if err != nil { @@ -213,7 +217,7 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat return nil, nil, err } - filterNode := newFilterNode(ns, pattern) + filterNode := newFilterNode(namespacePathPatterns, pattern) err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode) if err != nil { return nil, nil, err @@ -259,15 +263,23 @@ func (bus *EventBus) SetSendTimeout(timeout time.Duration) { bus.timeout = timeout } -func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter { +func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filter { return &eventlogger.Filter{ Predicate: func(e *eventlogger.Event) (bool, error) { eventRecv := e.Payload.(*logical.EventReceived) - - // Drop if event is not in our namespace. - // TODO: add wildcard/child namespace processing here in some cases? - if eventRecv.Namespace != ns.Path { - return false, nil + eventNs := strings.Trim(eventRecv.Namespace, "/") + // Drop if event is not in namespace patterns namespace. + if len(namespacePatterns) > 0 { + allow := false + for _, nsPattern := range namespacePatterns { + if glob.Glob(nsPattern, eventNs) { + allow = true + break + } + } + if !allow { + return false, nil + } } // Filter for correct event type, including wildcards.