diff --git a/changelog/25640.txt b/changelog/25640.txt new file mode 100644 index 0000000000..8a213a1d87 --- /dev/null +++ b/changelog/25640.txt @@ -0,0 +1,3 @@ +```release-note:change +events: Remove event noficiations websocket endpoint in non-Enterprise +``` diff --git a/http/events.go b/http/events.go deleted file mode 100644 index fe64593ce8..0000000000 --- a/http/events.go +++ /dev/null @@ -1,348 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package http - -import ( - "context" - "errors" - "fmt" - "net/http" - "path" - "slices" - "strconv" - "strings" - "time" - - "github.com/golang/protobuf/proto" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/vault/helper/namespace" - "github.com/hashicorp/vault/sdk/logical" - "github.com/hashicorp/vault/vault" - "github.com/hashicorp/vault/vault/eventbus" - "github.com/patrickmn/go-cache" - "github.com/ryanuber/go-glob" - "nhooyr.io/websocket" -) - -// webSocketRevalidationTime is how often we re-check access to the -// events that the websocket requested access to. -var webSocketRevalidationTime = 5 * time.Minute - -type eventSubscriber struct { - ctx context.Context - cancelCtx context.CancelFunc - clientToken string - logger hclog.Logger - events *eventbus.EventBus - namespacePatterns []string - pattern string - bexprFilter string - json bool - checkCache *cache.Cache - isRootToken bool - core *vault.Core - w http.ResponseWriter - r *http.Request - req *logical.Request -} - -// handleEventsSubscribeWebsocket subscribes to the events, accepts the websocket connection, and then runs forever, -// serving events to the websocket connection. -func (sub *eventSubscriber) handleEventsSubscribeWebsocket() { - ctx := sub.ctx - logger := sub.logger - // subscribe before accept to avoid race conditions - ch, cancel, err := sub.events.SubscribeMultipleNamespaces(ctx, sub.namespacePatterns, sub.pattern, sub.bexprFilter) - if err != nil { - logger.Info("Error subscribing", "error", err) - sub.w.WriteHeader(400) - sub.w.Write([]byte("Error subscribing")) - return - } - defer cancel() - logger.Debug("WebSocket is subscribed to messages", "namespaces", sub.namespacePatterns, "event_types", sub.pattern, "bexpr_filter", sub.bexprFilter) - - conn, err := websocket.Accept(sub.w, sub.r, nil) - if err != nil { - logger.Info("Could not accept as websocket", "error", err) - respondError(sub.w, http.StatusInternalServerError, fmt.Errorf("could not accept as websocket")) - return - } - - // continually validate subscribe access while the websocket is running - // this has to be done after accepting the websocket to avoid a race condition - go sub.validateSubscribeAccessLoop() - - // make sure to close the websocket - closeStatus := websocket.StatusNormalClosure - closeReason := "" - var closeErr error = nil - - defer func() { - if closeErr != nil { - closeStatus = websocket.CloseStatus(err) - if closeStatus == -1 { - closeStatus = websocket.StatusInternalError - } - closeReason = fmt.Sprintf("Internal error: %v", err) - logger.Debug("Error from websocket handler", "error", err) - } - // Close() will panic if the reason is greater than this length - if len(closeReason) > 123 { - logger.Debug("Truncated close reason", "closeReason", closeReason) - closeReason = closeReason[:123] - } - err = conn.Close(closeStatus, closeReason) - if err != nil { - logger.Debug("Error closing websocket", "error", err) - } - }() - - // we don't expect any incoming messages - ctx = conn.CloseRead(ctx) - // start the pinger - go func() { - for { - time.Sleep(30 * time.Second) // not too aggressive, but keep the HTTP connection alive - err := conn.Ping(ctx) - if err != nil { - return - } - } - }() - - for { - select { - case <-ctx.Done(): - logger.Info("Websocket context is done, closing the connection") - return - case message := <-ch: - // Perform one last check that the message is allowed to be received. - // For example, if a new namespace was created that matches the namespace patterns, - // but the token doesn't have access to it, we don't want to accidentally send it to - // the websocket. - if !sub.allowMessageCached(message.Payload.(*logical.EventReceived)) { - continue - } - - logger.Debug("Sending message to websocket", "message", message.Payload) - var messageBytes []byte - var messageType websocket.MessageType - if sub.json { - var ok bool - messageBytes, ok = message.Format("cloudevents-json") - if !ok { - logger.Warn("Could not get cloudevents JSON format") - closeErr = errors.New("could not get cloudevents JSON format") - return - } - messageType = websocket.MessageText - } else { - messageBytes, err = proto.Marshal(message.Payload.(*logical.EventReceived)) - messageType = websocket.MessageBinary - } - if err != nil { - logger.Warn("Could not serialize websocket event", "error", err) - closeErr = err - return - } - err = conn.Write(ctx, messageType, messageBytes) - if err != nil { - closeErr = err - return - } - } - } -} - -// allowMessageCached checks that the message is allowed to received by the websocket. -// It caches results for specific namespaces, data paths, and event types. -func (sub *eventSubscriber) allowMessageCached(message *logical.EventReceived) bool { - if sub.isRootToken { - // fast-path root tokens - return true - } - - messageNs := strings.Trim(message.Namespace, "/") - dataPath := "" - if message.Event.Metadata != nil { - dataPathField := message.Event.Metadata.GetFields()[logical.EventMetadataDataPath] - if dataPathField != nil { - dataPath = dataPathField.GetStringValue() - } - } - if dataPath == "" { - // Only allow root tokens to subscribe to events with no data path, for now. - return false - } - cacheKey := fmt.Sprintf("%v!%v!%v", messageNs, dataPath, message.EventType) - _, ok := sub.checkCache.Get(cacheKey) - if ok { - return true - } - - // perform the actual check and cache it if true - ok = sub.allowMessage(messageNs, dataPath, message.EventType) - if ok { - err := sub.checkCache.Add(cacheKey, ok, webSocketRevalidationTime) - if err != nil { - sub.logger.Debug("Error adding to policy check cache for websocket", "error", err) - // still return the right value, but we can't guarantee it was cached - } - } - return ok -} - -// allowMessage checks that the message is allowed to received by the websocket -func (sub *eventSubscriber) allowMessage(eventNs, dataPath, eventType string) bool { - // does this even match the requested namespaces - matchedNs := false - for _, nsPattern := range sub.namespacePatterns { - if glob.Glob(nsPattern, eventNs) { - matchedNs = true - break - } - } - if !matchedNs { - return false - } - - // next check for specific access to the namespace and event types - nsDataPath := dataPath - if eventNs != "" { - nsDataPath = path.Join(eventNs, dataPath) - } - capabilities, allowedEventTypes, err := sub.core.CapabilitiesAndSubscribeEventTypes(sub.ctx, sub.clientToken, nsDataPath) - if err != nil { - sub.logger.Debug("Error checking capabilities and event types for token", "error", err, "namespace", eventNs) - return false - } - if !(slices.Contains(capabilities, vault.RootCapability) || slices.Contains(capabilities, vault.SubscribeCapability)) { - return false - } - for _, pattern := range allowedEventTypes { - if glob.Glob(pattern, eventType) { - return true - } - } - // no event types matched, so return false - return false -} - -func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - logger := core.Logger().Named("events-subscribe") - logger.Debug("Got request to", "url", r.URL, "version", r.Proto) - - ctx := r.Context() - - // ACL check - auth, entry, err := core.CheckToken(ctx, req, false) - if err != nil { - if errors.Is(err, logical.ErrPermissionDenied) { - respondError(w, http.StatusForbidden, logical.ErrPermissionDenied) - return - } - logger.Debug("Error validating token", "error", err) - respondError(w, http.StatusInternalServerError, fmt.Errorf("error validating token")) - return - } - - ns, err := namespace.FromContext(ctx) - if err != nil { - logger.Info("Could not find namespace", "error", err) - respondError(w, http.StatusInternalServerError, fmt.Errorf("could not find namespace")) - return - } - - prefix := "/v1/sys/events/subscribe/" - if ns.ID != namespace.RootNamespaceID { - prefix = fmt.Sprintf("/v1/%ssys/events/subscribe/", ns.Path) - } - pattern := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, prefix)) - if pattern == "" { - respondError(w, http.StatusBadRequest, fmt.Errorf("did not specify eventType to subscribe to")) - return - } - - json := false - jsonRaw := r.URL.Query().Get("json") - if jsonRaw != "" { - var err error - json, err = strconv.ParseBool(jsonRaw) - if err != nil { - respondError(w, http.StatusBadRequest, fmt.Errorf("invalid parameter for JSON: %v", jsonRaw)) - return - } - } - - bexprFilter := strings.TrimSpace(r.URL.Query().Get("filter")) - namespacePatterns := r.URL.Query()["namespaces"] - namespacePatterns = prependNamespacePatterns(namespacePatterns, ns) - isRoot := entry.IsRoot() - ctx, cancelCtx := context.WithCancel(ctx) - defer cancelCtx() - - sub := &eventSubscriber{ - ctx: ctx, - cancelCtx: cancelCtx, - logger: logger, - events: core.Events(), - namespacePatterns: namespacePatterns, - pattern: pattern, - bexprFilter: bexprFilter, - json: json, - checkCache: cache.New(webSocketRevalidationTime, webSocketRevalidationTime), - clientToken: auth.ClientToken, - isRootToken: isRoot, - core: core, - w: w, - r: r, - req: req, - } - sub.handleEventsSubscribeWebsocket() - }) -} - -// prependNamespacePatterns prepends the request namespace to the namespace patterns, -// and also adds the request namespace to the list. -func prependNamespacePatterns(patterns []string, requestNamespace *namespace.Namespace) []string { - prepend := strings.Trim(requestNamespace.Path, "/") - newPatterns := make([]string, 0, len(patterns)+1) - newPatterns = append(newPatterns, prepend) - for _, pattern := range patterns { - if strings.Trim(pattern, "/") != "" { - newPatterns = append(newPatterns, path.Join(prepend, pattern)) - } - } - return newPatterns -} - -// validateSubscribeAccessLoop continually checks if the request has access to the subscribe endpoint in -// its namespace. If the access check ever fails, then the cancel function is called and the function returns. -func (sub *eventSubscriber) validateSubscribeAccessLoop() { - // if something breaks, default to canceling the websocket - defer sub.cancelCtx() - for { - _, _, err := sub.core.CheckTokenWithLock(sub.ctx, sub.req, false) - if err != nil { - sub.core.Logger().Debug("Token does not have access to subscription path in its own namespace, terminating WebSocket subscription", "path", sub.req.Path, "error", err) - return - } - // wait a while and try again, but quit the loop if the context finishes early - finished := func() bool { - ticker := time.NewTicker(webSocketRevalidationTime) - defer ticker.Stop() - select { - case <-sub.ctx.Done(): - return true - case <-ticker.C: - return false - } - }() - if finished { - return - } - } -} diff --git a/http/events_stubs_oss.go b/http/events_stubs_oss.go new file mode 100644 index 0000000000..c1a4a67359 --- /dev/null +++ b/http/events_stubs_oss.go @@ -0,0 +1,19 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !enterprise + +package http + +import ( + "net/http" + + "github.com/hashicorp/vault/sdk/logical" + "github.com/hashicorp/vault/vault" +) + +//go:generate go run github.com/hashicorp/vault/tools/stubmaker + +func entHandleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler { + return nil +} diff --git a/http/events_test.go b/http/events_test.go deleted file mode 100644 index 1ae014364b..0000000000 --- a/http/events_test.go +++ /dev/null @@ -1,410 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package http - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "sync" - "testing" - "time" - - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-uuid" - "github.com/hashicorp/vault/api" - "github.com/hashicorp/vault/audit" - "github.com/hashicorp/vault/helper/namespace" - "github.com/hashicorp/vault/helper/testhelpers/corehelpers" - "github.com/hashicorp/vault/sdk/helper/consts" - "github.com/hashicorp/vault/sdk/logical" - "github.com/hashicorp/vault/vault" - "github.com/hashicorp/vault/vault/cluster" - "github.com/stretchr/testify/assert" - "nhooyr.io/websocket" -) - -// TestEventsSubscribe tests the websocket endpoint for subscribing to events -// by generating some events. -func TestEventsSubscribe(t *testing.T) { - core := vault.TestCoreWithConfig(t, &vault.CoreConfig{}) - ln, addr := TestServer(t, core) - defer ln.Close() - - // unseal the core - keys, token := vault.TestCoreInit(t, core) - for _, key := range keys { - _, err := core.Unseal(key) - if err != nil { - t.Fatal(err) - } - } - - const eventType = "abc" - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - // send some events - sendEvents := func() error { - id, err := uuid.GenerateUUID() - if err != nil { - return err - } - pluginInfo := &logical.EventPluginInfo{ - MountPath: "secret", - } - err = core.Events().SendEventInternal(namespace.RootContext(ctx), namespace.RootNamespace, pluginInfo, logical.EventType(eventType), &logical.EventData{ - Id: id, - Metadata: nil, - EntityIds: nil, - Note: "testing", - }) - if err != nil { - return err - } - return nil - } - - wsAddr := strings.Replace(addr, "http", "ws", 1) - - testCases := []struct { - json bool - }{{true}, {false}} - - for _, testCase := range testCases { - location := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?namespaces=ns1&namespaces=ns*&json=%v", wsAddr, eventType, testCase.json) - conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{ - HTTPHeader: http.Header{"x-vault-token": []string{token}}, - }) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { - conn.Close(websocket.StatusNormalClosure, "") - }) - - err = sendEvents() - if err != nil { - t.Fatal(err) - } - _, msg, err := conn.Read(ctx) - if err != nil { - t.Fatal(err) - } - if testCase.json { - event := map[string]interface{}{} - err = json.Unmarshal(msg, &event) - if err != nil { - t.Fatal(err) - } - t.Log(string(msg)) - data := event["data"].(map[string]interface{}) - if actualType := data["event_type"].(string); actualType != eventType { - t.Fatalf("Expeced event type %s, got %s", eventType, actualType) - } - pluginInfo, ok := data["plugin_info"].(map[string]interface{}) - if !ok || pluginInfo == nil { - t.Fatalf("No plugin_info object: %v", data) - } - mountPath, ok := pluginInfo["mount_path"].(string) - if !ok || mountPath != "secret" { - t.Fatalf("Wrong mount_path: %v", data) - } - innerEvent := data["event"].(map[string]interface{}) - if innerEvent["id"].(string) != event["id"].(string) { - t.Fatalf("IDs don't match, expected %s, got %s", innerEvent["id"].(string), event["id"].(string)) - } - if innerEvent["note"].(string) != "testing" { - t.Fatalf("Expected 'testing', got %s", innerEvent["note"].(string)) - } - - checkRequiredCloudEventsFields(t, event) - } - } -} - -// TestBexprFilters tests that go-bexpr filters are used to filter events. -func TestBexprFilters(t *testing.T) { - core := vault.TestCoreWithConfig(t, &vault.CoreConfig{}) - ln, addr := TestServer(t, core) - defer ln.Close() - - // unseal the core - keys, token := vault.TestCoreInit(t, core) - for _, key := range keys { - _, err := core.Unseal(key) - if err != nil { - t.Fatal(err) - } - } - sendEvents := func(ctx context.Context, eventTypes ...string) error { - for _, eventType := range eventTypes { - pluginInfo := &logical.EventPluginInfo{ - MountPath: "secret", - } - ns := namespace.RootNamespace - id := eventType - err := core.Events().SendEventInternal(namespace.RootContext(ctx), ns, pluginInfo, logical.EventType(eventType), &logical.EventData{ - Id: id, - Metadata: nil, - EntityIds: nil, - Note: "testing", - }) - if err != nil { - return err - } - } - return nil - } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - wsAddr := strings.Replace(addr, "http", "ws", 1) - bexprFilter := url.QueryEscape("event_type == abc") - - location := fmt.Sprintf("%s/v1/sys/events/subscribe/*?json=true&filter=%s", wsAddr, bexprFilter) - conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{ - HTTPHeader: http.Header{"x-vault-token": []string{token}}, - }) - if err != nil { - t.Fatal(err) - } - defer conn.Close(websocket.StatusNormalClosure, "") - - err = sendEvents(ctx, "abc", "def", "xyz") - if err != nil { - t.Fatal(err) - } - // read until we time out - seen := map[string]bool{} - done := false - for !done { - done = func() bool { - readCtx, readCancel := context.WithTimeout(context.Background(), 1*time.Second) - defer readCancel() - _, msg, err := conn.Read(readCtx) - if err != nil { - return true - } - event := map[string]interface{}{} - err = json.Unmarshal(msg, &event) - if err != nil { - t.Error(err) - return true - } - seen[event["id"].(string)] = true - return false - }() - } - // we should only get the "abc" messages - assert.Len(t, seen, 1) - assert.Contains(t, seen, "abc") -} - -func TestNamespacePrepend(t *testing.T) { - testCases := []struct { - requestNs string - patterns []string - result []string - }{ - {"", []string{"ns*"}, []string{"", "ns*"}}, - {"ns1", []string{"ns*"}, []string{"ns1", "ns1/ns*"}}, - {"ns1", []string{"ns1*"}, []string{"ns1", "ns1/ns1*"}}, - {"ns1", []string{"ns1/*"}, []string{"ns1", "ns1/ns1/*"}}, - {"", []string{"ns1/ns13", "ns1/other"}, []string{"", "ns1/ns13", "ns1/other"}}, - {"ns1", []string{"ns1/ns13", "ns1/other"}, []string{"ns1", "ns1/ns1/ns13", "ns1/ns1/other"}}, - {"", []string{""}, []string{""}}, - {"", nil, []string{""}}, - {"ns1", []string{""}, []string{"ns1"}}, - {"ns1", []string{"", ""}, []string{"ns1"}}, - {"ns1", []string{"ns1"}, []string{"ns1", "ns1/ns1"}}, - {"", []string{"*"}, []string{"", "*"}}, - {"ns1", []string{"*"}, []string{"ns1", "ns1/*"}}, - {"", []string{"ns1/ns13*", "ns2"}, []string{"", "ns1/ns13*", "ns2"}}, - {"ns1", []string{"ns1/ns13*", "ns2"}, []string{"ns1", "ns1/ns1/ns13*", "ns1/ns2"}}, - {"", []string{"ns*", "ns1"}, []string{"", "ns*", "ns1"}}, - {"ns1", []string{"ns*", "ns1"}, []string{"ns1", "ns1/ns*", "ns1/ns1"}}, - {"ns1", []string{"ns1*", "ns1"}, []string{"ns1", "ns1/ns1*", "ns1/ns1"}}, - {"ns1", []string{"ns1/*", "ns1"}, []string{"ns1", "ns1/ns1/*", "ns1/ns1"}}, - } - for _, testCase := range testCases { - t.Run(testCase.requestNs+" "+strings.Join(testCase.patterns, " "), func(t *testing.T) { - result := prependNamespacePatterns(testCase.patterns, &namespace.Namespace{ID: testCase.requestNs, Path: testCase.requestNs}) - assert.Equal(t, testCase.result, result) - }) - } -} - -func checkRequiredCloudEventsFields(t *testing.T, event map[string]interface{}) { - t.Helper() - for _, attr := range []string{"id", "source", "specversion", "type"} { - if v, ok := event[attr]; !ok { - t.Errorf("Missing attribute %s", attr) - } else if str, ok := v.(string); !ok { - t.Errorf("Expected %s to be string but got %T", attr, v) - } else if str == "" { - t.Errorf("%s was empty string", attr) - } - } -} - -// TestEventsSubscribeAuth tests that unauthenticated and unauthorized subscriptions -// fail correctly. -func TestEventsSubscribeAuth(t *testing.T) { - core := vault.TestCore(t) - ln, addr := TestServer(t, core) - defer ln.Close() - - // unseal the core - keys, root := vault.TestCoreInit(t, core) - for _, key := range keys { - _, err := core.Unseal(key) - if err != nil { - t.Fatal(err) - } - } - - var nonPrivilegedToken string - // Fetch a valid non privileged token. - { - config := api.DefaultConfig() - config.Address = addr - - client, err := api.NewClient(config) - if err != nil { - t.Fatal(err) - } - client.SetToken(root) - - secret, err := client.Auth().Token().Create(&api.TokenCreateRequest{Policies: []string{"default"}}) - if err != nil { - t.Fatal(err) - } - if secret.Auth.ClientToken == "" { - t.Fatal("Failed to fetch a non privileged token") - } - nonPrivilegedToken = secret.Auth.ClientToken - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - wsAddr := strings.Replace(addr, "http", "ws", 1) - - // Get a 403 with no token. - _, resp, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/abc", nil) - if err == nil { - t.Error("Expected websocket error but got none") - } - if resp == nil || resp.StatusCode != http.StatusForbidden { - t.Errorf("Expected 403 but got %+v", resp) - } - - // Get a 403 with a non privileged token. - _, resp, err = websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/abc", &websocket.DialOptions{ - HTTPHeader: http.Header{"x-vault-token": []string{nonPrivilegedToken}}, - }) - if err == nil { - t.Error("Expected websocket error but got none") - } - if resp == nil || resp.StatusCode != http.StatusForbidden { - t.Errorf("Expected 403 but got %+v", resp) - } -} - -func TestCanForwardEventConnections(t *testing.T) { - // Run again with in-memory network - inmemCluster, err := cluster.NewInmemLayerCluster("inmem-cluster", 3, hclog.New(&hclog.LoggerOptions{ - Mutex: &sync.Mutex{}, - Level: hclog.Trace, - Name: "inmem-cluster", - })) - if err != nil { - t.Fatal(err) - } - testCluster := vault.NewTestCluster(t, &vault.CoreConfig{ - AuditBackends: map[string]audit.Factory{ - "nop": corehelpers.NoopAuditFactory(nil), - }, - }, &vault.TestClusterOptions{ - ClusterLayers: inmemCluster, - }) - cores := testCluster.Cores - testCluster.Start() - defer testCluster.Cleanup() - - rootToken := testCluster.RootToken - - // Wait for core to become active - vault.TestWaitActiveForwardingReady(t, cores[0].Core) - - // Test forwarding a request. Since we're going directly from core to core - // with no fallback we know that if it worked, request handling is working - c := cores[1] - standby, err := c.Standby() - if err != nil { - t.Fatal(err) - } - if !standby { - t.Fatal("expected core to be standby") - } - - // We need to call Leader as that refreshes the connection info - isLeader, _, _, err := c.Leader() - if err != nil { - t.Fatal(err) - } - if isLeader { - t.Fatal("core should not be leader") - } - corehelpers.RetryUntil(t, 5*time.Second, func() error { - state := c.ActiveNodeReplicationState() - if state == 0 { - return fmt.Errorf("heartbeats have not yet returned a valid active node replication state: %d", state) - } - return nil - }) - - req, err := http.NewRequest("GET", "https://pushit.real.good:9281/v1/sys/events/subscribe/xyz?json=true", nil) - if err != nil { - t.Fatal(err) - } - req = req.WithContext(namespace.RootContext(req.Context())) - req.Header.Add(consts.AuthHeaderName, rootToken) - - resp := httptest.NewRecorder() - forwardRequest(cores[1].Core, resp, req) - - header := resp.Header() - if header == nil { - t.Fatal("err: expected at least a Location header") - } - if !strings.HasPrefix(header.Get("Location"), "wss://") { - t.Fatalf("bad location: %s", header.Get("Location")) - } - - // test forwarding requests to each core - handled := 0 - forwarded := 0 - for _, c := range cores { - resp := httptest.NewRecorder() - fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - handled++ - }) - handleRequestForwarding(c.Core, fakeHandler).ServeHTTP(resp, req) - header := resp.Header() - if header == nil { - continue - } - if strings.HasPrefix(header.Get("Location"), "wss://") { - forwarded++ - } - } - if handled != 1 && forwarded != 2 { - t.Fatalf("Expected 1 core to handle the request and 2 to forward") - } -} diff --git a/http/logical.go b/http/logical.go index f11cb959af..cf80df2b0f 100644 --- a/http/logical.go +++ b/http/logical.go @@ -367,9 +367,11 @@ func handleLogicalInternal(core *vault.Core, injectDataIntoTopLevel bool, noForw nsPath = "" } if strings.HasPrefix(r.URL.Path, fmt.Sprintf("/v1/%ssys/events/subscribe/", nsPath)) { - handler := handleEventsSubscribe(core, req) - handler.ServeHTTP(w, r) - return + handler := entHandleEventsSubscribe(core, req) + if handler != nil { + handler.ServeHTTP(w, r) + return + } } handler := handleEntPaths(nsPath, core, r) if handler != nil { diff --git a/website/content/docs/commands/events.mdx b/website/content/docs/commands/events.mdx index 95f6913e5d..51a91c81be 100644 --- a/website/content/docs/commands/events.mdx +++ b/website/content/docs/commands/events.mdx @@ -7,6 +7,8 @@ description: |- # events + + Use the `events` command to get a real-time display of [event notifications](/vault/docs/concepts/events) generated by Vault and to subscribe to Vault event notifications. Note that the `events subscribe` runs indefinitly and will not exit on @@ -55,6 +57,23 @@ flags](/vault/docs/commands) included on all commands. - `-timeout`: `(duration: "")` - close the WebSocket automatically after the specified duration. +- `-namespaces` `(string)` - Additional **child** namespaces for the + subscription. Repeat the flag to add additional namespace patterns to the + subscription request. Vault automatically prepends the issuing namespace for + the request to the provided namespace. For example, if you include + `-namespaces=ns2` on a request made in the `ns1` namespace, Vault will attempt + to subscribe you to event notifications under the `ns1/ns2` and `ns1` namespaces. You can + use the `*` character to include wildcards in the namespace pattern. By + default, Vault will only subscribe to event notifications in the requesting namespace. + + + To subscribe to event notifications across multiple namespaces, you must provide a root + token or a token associated with appropriate policies across all the targeted + namespaces. Refer to + the Secure multi-tenancy with + namespacestutorial for configuring your Vault instance appropriately. + + - `-filter` `(string: "")` - Filter expression used to select event notifications to be sent through the WebSocket. @@ -81,25 +100,3 @@ list of filtering options and an explanation on how Vault evaluates filter expre ``` data_path == secret/data/foo and operation != write ``` - - -### Enterprise options - - - -- `-namespaces` `(string)` - Additional **child** namespaces for the - subscription. Repeat the flag to add additional namespace patterns to the - subscription request. Vault automatically prepends the issuing namespace for - the request to the provided namespace. For example, if you include - `-namespaces=ns2` on a request made in the `ns1` namespace, Vault will attempt - to subscribe you to event notifications under the `ns1/ns2` and `ns1` namespaces. You can - use the `*` character to include wildcards in the namespace pattern. By - default, Vault will only subscribe to event notifications in the requesting namespace. - - - To subscribe to event notifications across multiple namespaces, you must provide a root - token or a token associated with appropriate policies across all the targeted - namespaces. Refer to - the Secure multi-tenancy with - namespacestutorial for configuring your Vault instance appropriately. - diff --git a/website/content/docs/concepts/events.mdx b/website/content/docs/concepts/events.mdx index 1103c5786d..9a3d0d7af2 100644 --- a/website/content/docs/concepts/events.mdx +++ b/website/content/docs/concepts/events.mdx @@ -8,6 +8,8 @@ description: >- # Event Notifications + + Event notifications are arbitrary, **non-secret** data that can be exchanged between producers (Vault and plugins) and subscribers (Vault components and external users via the API). diff --git a/website/data/docs-nav-data.json b/website/data/docs-nav-data.json index 8c36ee89aa..c68e59e700 100644 --- a/website/data/docs-nav-data.json +++ b/website/data/docs-nav-data.json @@ -281,7 +281,12 @@ }, { "title": "Events", - "path": "concepts/events" + "path": "concepts/events", + "badge": { + "text": "ENTERPRISE", + "type": "outlined", + "color": "highlight" + } }, { "title": "Filtering",