mirror of
https://github.com/optim-enterprises-bv/vault.git
synced 2025-10-29 01:32:33 +00:00
events: Now enterprise-only (#25640)
This removes the WebSockets endpoint for events (which will be moved to the Enterprise repo) and disables tests that rely on it unless they are running in Enterprise. It also updates documentation to document that events are only available in Vault Enterprise.
This commit is contained in:
committed by
GitHub
parent
ccc2e1b391
commit
ae9ec39d44
3
changelog/25640.txt
Normal file
3
changelog/25640.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:change
|
||||
events: Remove event noficiations websocket endpoint in non-Enterprise
|
||||
```
|
||||
348
http/events.go
348
http/events.go
@@ -1,348 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
"github.com/hashicorp/vault/vault/eventbus"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/ryanuber/go-glob"
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
// webSocketRevalidationTime is how often we re-check access to the
|
||||
// events that the websocket requested access to.
|
||||
var webSocketRevalidationTime = 5 * time.Minute
|
||||
|
||||
type eventSubscriber struct {
|
||||
ctx context.Context
|
||||
cancelCtx context.CancelFunc
|
||||
clientToken string
|
||||
logger hclog.Logger
|
||||
events *eventbus.EventBus
|
||||
namespacePatterns []string
|
||||
pattern string
|
||||
bexprFilter string
|
||||
json bool
|
||||
checkCache *cache.Cache
|
||||
isRootToken bool
|
||||
core *vault.Core
|
||||
w http.ResponseWriter
|
||||
r *http.Request
|
||||
req *logical.Request
|
||||
}
|
||||
|
||||
// handleEventsSubscribeWebsocket subscribes to the events, accepts the websocket connection, and then runs forever,
|
||||
// serving events to the websocket connection.
|
||||
func (sub *eventSubscriber) handleEventsSubscribeWebsocket() {
|
||||
ctx := sub.ctx
|
||||
logger := sub.logger
|
||||
// subscribe before accept to avoid race conditions
|
||||
ch, cancel, err := sub.events.SubscribeMultipleNamespaces(ctx, sub.namespacePatterns, sub.pattern, sub.bexprFilter)
|
||||
if err != nil {
|
||||
logger.Info("Error subscribing", "error", err)
|
||||
sub.w.WriteHeader(400)
|
||||
sub.w.Write([]byte("Error subscribing"))
|
||||
return
|
||||
}
|
||||
defer cancel()
|
||||
logger.Debug("WebSocket is subscribed to messages", "namespaces", sub.namespacePatterns, "event_types", sub.pattern, "bexpr_filter", sub.bexprFilter)
|
||||
|
||||
conn, err := websocket.Accept(sub.w, sub.r, nil)
|
||||
if err != nil {
|
||||
logger.Info("Could not accept as websocket", "error", err)
|
||||
respondError(sub.w, http.StatusInternalServerError, fmt.Errorf("could not accept as websocket"))
|
||||
return
|
||||
}
|
||||
|
||||
// continually validate subscribe access while the websocket is running
|
||||
// this has to be done after accepting the websocket to avoid a race condition
|
||||
go sub.validateSubscribeAccessLoop()
|
||||
|
||||
// make sure to close the websocket
|
||||
closeStatus := websocket.StatusNormalClosure
|
||||
closeReason := ""
|
||||
var closeErr error = nil
|
||||
|
||||
defer func() {
|
||||
if closeErr != nil {
|
||||
closeStatus = websocket.CloseStatus(err)
|
||||
if closeStatus == -1 {
|
||||
closeStatus = websocket.StatusInternalError
|
||||
}
|
||||
closeReason = fmt.Sprintf("Internal error: %v", err)
|
||||
logger.Debug("Error from websocket handler", "error", err)
|
||||
}
|
||||
// Close() will panic if the reason is greater than this length
|
||||
if len(closeReason) > 123 {
|
||||
logger.Debug("Truncated close reason", "closeReason", closeReason)
|
||||
closeReason = closeReason[:123]
|
||||
}
|
||||
err = conn.Close(closeStatus, closeReason)
|
||||
if err != nil {
|
||||
logger.Debug("Error closing websocket", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// we don't expect any incoming messages
|
||||
ctx = conn.CloseRead(ctx)
|
||||
// start the pinger
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(30 * time.Second) // not too aggressive, but keep the HTTP connection alive
|
||||
err := conn.Ping(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("Websocket context is done, closing the connection")
|
||||
return
|
||||
case message := <-ch:
|
||||
// Perform one last check that the message is allowed to be received.
|
||||
// For example, if a new namespace was created that matches the namespace patterns,
|
||||
// but the token doesn't have access to it, we don't want to accidentally send it to
|
||||
// the websocket.
|
||||
if !sub.allowMessageCached(message.Payload.(*logical.EventReceived)) {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Debug("Sending message to websocket", "message", message.Payload)
|
||||
var messageBytes []byte
|
||||
var messageType websocket.MessageType
|
||||
if sub.json {
|
||||
var ok bool
|
||||
messageBytes, ok = message.Format("cloudevents-json")
|
||||
if !ok {
|
||||
logger.Warn("Could not get cloudevents JSON format")
|
||||
closeErr = errors.New("could not get cloudevents JSON format")
|
||||
return
|
||||
}
|
||||
messageType = websocket.MessageText
|
||||
} else {
|
||||
messageBytes, err = proto.Marshal(message.Payload.(*logical.EventReceived))
|
||||
messageType = websocket.MessageBinary
|
||||
}
|
||||
if err != nil {
|
||||
logger.Warn("Could not serialize websocket event", "error", err)
|
||||
closeErr = err
|
||||
return
|
||||
}
|
||||
err = conn.Write(ctx, messageType, messageBytes)
|
||||
if err != nil {
|
||||
closeErr = err
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// allowMessageCached checks that the message is allowed to received by the websocket.
|
||||
// It caches results for specific namespaces, data paths, and event types.
|
||||
func (sub *eventSubscriber) allowMessageCached(message *logical.EventReceived) bool {
|
||||
if sub.isRootToken {
|
||||
// fast-path root tokens
|
||||
return true
|
||||
}
|
||||
|
||||
messageNs := strings.Trim(message.Namespace, "/")
|
||||
dataPath := ""
|
||||
if message.Event.Metadata != nil {
|
||||
dataPathField := message.Event.Metadata.GetFields()[logical.EventMetadataDataPath]
|
||||
if dataPathField != nil {
|
||||
dataPath = dataPathField.GetStringValue()
|
||||
}
|
||||
}
|
||||
if dataPath == "" {
|
||||
// Only allow root tokens to subscribe to events with no data path, for now.
|
||||
return false
|
||||
}
|
||||
cacheKey := fmt.Sprintf("%v!%v!%v", messageNs, dataPath, message.EventType)
|
||||
_, ok := sub.checkCache.Get(cacheKey)
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
|
||||
// perform the actual check and cache it if true
|
||||
ok = sub.allowMessage(messageNs, dataPath, message.EventType)
|
||||
if ok {
|
||||
err := sub.checkCache.Add(cacheKey, ok, webSocketRevalidationTime)
|
||||
if err != nil {
|
||||
sub.logger.Debug("Error adding to policy check cache for websocket", "error", err)
|
||||
// still return the right value, but we can't guarantee it was cached
|
||||
}
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// allowMessage checks that the message is allowed to received by the websocket
|
||||
func (sub *eventSubscriber) allowMessage(eventNs, dataPath, eventType string) bool {
|
||||
// does this even match the requested namespaces
|
||||
matchedNs := false
|
||||
for _, nsPattern := range sub.namespacePatterns {
|
||||
if glob.Glob(nsPattern, eventNs) {
|
||||
matchedNs = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !matchedNs {
|
||||
return false
|
||||
}
|
||||
|
||||
// next check for specific access to the namespace and event types
|
||||
nsDataPath := dataPath
|
||||
if eventNs != "" {
|
||||
nsDataPath = path.Join(eventNs, dataPath)
|
||||
}
|
||||
capabilities, allowedEventTypes, err := sub.core.CapabilitiesAndSubscribeEventTypes(sub.ctx, sub.clientToken, nsDataPath)
|
||||
if err != nil {
|
||||
sub.logger.Debug("Error checking capabilities and event types for token", "error", err, "namespace", eventNs)
|
||||
return false
|
||||
}
|
||||
if !(slices.Contains(capabilities, vault.RootCapability) || slices.Contains(capabilities, vault.SubscribeCapability)) {
|
||||
return false
|
||||
}
|
||||
for _, pattern := range allowedEventTypes {
|
||||
if glob.Glob(pattern, eventType) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
// no event types matched, so return false
|
||||
return false
|
||||
}
|
||||
|
||||
func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
logger := core.Logger().Named("events-subscribe")
|
||||
logger.Debug("Got request to", "url", r.URL, "version", r.Proto)
|
||||
|
||||
ctx := r.Context()
|
||||
|
||||
// ACL check
|
||||
auth, entry, err := core.CheckToken(ctx, req, false)
|
||||
if err != nil {
|
||||
if errors.Is(err, logical.ErrPermissionDenied) {
|
||||
respondError(w, http.StatusForbidden, logical.ErrPermissionDenied)
|
||||
return
|
||||
}
|
||||
logger.Debug("Error validating token", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, fmt.Errorf("error validating token"))
|
||||
return
|
||||
}
|
||||
|
||||
ns, err := namespace.FromContext(ctx)
|
||||
if err != nil {
|
||||
logger.Info("Could not find namespace", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, fmt.Errorf("could not find namespace"))
|
||||
return
|
||||
}
|
||||
|
||||
prefix := "/v1/sys/events/subscribe/"
|
||||
if ns.ID != namespace.RootNamespaceID {
|
||||
prefix = fmt.Sprintf("/v1/%ssys/events/subscribe/", ns.Path)
|
||||
}
|
||||
pattern := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, prefix))
|
||||
if pattern == "" {
|
||||
respondError(w, http.StatusBadRequest, fmt.Errorf("did not specify eventType to subscribe to"))
|
||||
return
|
||||
}
|
||||
|
||||
json := false
|
||||
jsonRaw := r.URL.Query().Get("json")
|
||||
if jsonRaw != "" {
|
||||
var err error
|
||||
json, err = strconv.ParseBool(jsonRaw)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusBadRequest, fmt.Errorf("invalid parameter for JSON: %v", jsonRaw))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
bexprFilter := strings.TrimSpace(r.URL.Query().Get("filter"))
|
||||
namespacePatterns := r.URL.Query()["namespaces"]
|
||||
namespacePatterns = prependNamespacePatterns(namespacePatterns, ns)
|
||||
isRoot := entry.IsRoot()
|
||||
ctx, cancelCtx := context.WithCancel(ctx)
|
||||
defer cancelCtx()
|
||||
|
||||
sub := &eventSubscriber{
|
||||
ctx: ctx,
|
||||
cancelCtx: cancelCtx,
|
||||
logger: logger,
|
||||
events: core.Events(),
|
||||
namespacePatterns: namespacePatterns,
|
||||
pattern: pattern,
|
||||
bexprFilter: bexprFilter,
|
||||
json: json,
|
||||
checkCache: cache.New(webSocketRevalidationTime, webSocketRevalidationTime),
|
||||
clientToken: auth.ClientToken,
|
||||
isRootToken: isRoot,
|
||||
core: core,
|
||||
w: w,
|
||||
r: r,
|
||||
req: req,
|
||||
}
|
||||
sub.handleEventsSubscribeWebsocket()
|
||||
})
|
||||
}
|
||||
|
||||
// prependNamespacePatterns prepends the request namespace to the namespace patterns,
|
||||
// and also adds the request namespace to the list.
|
||||
func prependNamespacePatterns(patterns []string, requestNamespace *namespace.Namespace) []string {
|
||||
prepend := strings.Trim(requestNamespace.Path, "/")
|
||||
newPatterns := make([]string, 0, len(patterns)+1)
|
||||
newPatterns = append(newPatterns, prepend)
|
||||
for _, pattern := range patterns {
|
||||
if strings.Trim(pattern, "/") != "" {
|
||||
newPatterns = append(newPatterns, path.Join(prepend, pattern))
|
||||
}
|
||||
}
|
||||
return newPatterns
|
||||
}
|
||||
|
||||
// validateSubscribeAccessLoop continually checks if the request has access to the subscribe endpoint in
|
||||
// its namespace. If the access check ever fails, then the cancel function is called and the function returns.
|
||||
func (sub *eventSubscriber) validateSubscribeAccessLoop() {
|
||||
// if something breaks, default to canceling the websocket
|
||||
defer sub.cancelCtx()
|
||||
for {
|
||||
_, _, err := sub.core.CheckTokenWithLock(sub.ctx, sub.req, false)
|
||||
if err != nil {
|
||||
sub.core.Logger().Debug("Token does not have access to subscription path in its own namespace, terminating WebSocket subscription", "path", sub.req.Path, "error", err)
|
||||
return
|
||||
}
|
||||
// wait a while and try again, but quit the loop if the context finishes early
|
||||
finished := func() bool {
|
||||
ticker := time.NewTicker(webSocketRevalidationTime)
|
||||
defer ticker.Stop()
|
||||
select {
|
||||
case <-sub.ctx.Done():
|
||||
return true
|
||||
case <-ticker.C:
|
||||
return false
|
||||
}
|
||||
}()
|
||||
if finished {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
19
http/events_stubs_oss.go
Normal file
19
http/events_stubs_oss.go
Normal file
@@ -0,0 +1,19 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
//go:build !enterprise
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/hashicorp/vault/tools/stubmaker
|
||||
|
||||
func entHandleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler {
|
||||
return nil
|
||||
}
|
||||
@@ -1,410 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/api"
|
||||
"github.com/hashicorp/vault/audit"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/helper/testhelpers/corehelpers"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
"github.com/hashicorp/vault/vault/cluster"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
// TestEventsSubscribe tests the websocket endpoint for subscribing to events
|
||||
// by generating some events.
|
||||
func TestEventsSubscribe(t *testing.T) {
|
||||
core := vault.TestCoreWithConfig(t, &vault.CoreConfig{})
|
||||
ln, addr := TestServer(t, core)
|
||||
defer ln.Close()
|
||||
|
||||
// unseal the core
|
||||
keys, token := vault.TestCoreInit(t, core)
|
||||
for _, key := range keys {
|
||||
_, err := core.Unseal(key)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
const eventType = "abc"
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// send some events
|
||||
sendEvents := func() error {
|
||||
id, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pluginInfo := &logical.EventPluginInfo{
|
||||
MountPath: "secret",
|
||||
}
|
||||
err = core.Events().SendEventInternal(namespace.RootContext(ctx), namespace.RootNamespace, pluginInfo, logical.EventType(eventType), &logical.EventData{
|
||||
Id: id,
|
||||
Metadata: nil,
|
||||
EntityIds: nil,
|
||||
Note: "testing",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
wsAddr := strings.Replace(addr, "http", "ws", 1)
|
||||
|
||||
testCases := []struct {
|
||||
json bool
|
||||
}{{true}, {false}}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
location := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?namespaces=ns1&namespaces=ns*&json=%v", wsAddr, eventType, testCase.json)
|
||||
conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{
|
||||
HTTPHeader: http.Header{"x-vault-token": []string{token}},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
conn.Close(websocket.StatusNormalClosure, "")
|
||||
})
|
||||
|
||||
err = sendEvents()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, msg, err := conn.Read(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if testCase.json {
|
||||
event := map[string]interface{}{}
|
||||
err = json.Unmarshal(msg, &event)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(string(msg))
|
||||
data := event["data"].(map[string]interface{})
|
||||
if actualType := data["event_type"].(string); actualType != eventType {
|
||||
t.Fatalf("Expeced event type %s, got %s", eventType, actualType)
|
||||
}
|
||||
pluginInfo, ok := data["plugin_info"].(map[string]interface{})
|
||||
if !ok || pluginInfo == nil {
|
||||
t.Fatalf("No plugin_info object: %v", data)
|
||||
}
|
||||
mountPath, ok := pluginInfo["mount_path"].(string)
|
||||
if !ok || mountPath != "secret" {
|
||||
t.Fatalf("Wrong mount_path: %v", data)
|
||||
}
|
||||
innerEvent := data["event"].(map[string]interface{})
|
||||
if innerEvent["id"].(string) != event["id"].(string) {
|
||||
t.Fatalf("IDs don't match, expected %s, got %s", innerEvent["id"].(string), event["id"].(string))
|
||||
}
|
||||
if innerEvent["note"].(string) != "testing" {
|
||||
t.Fatalf("Expected 'testing', got %s", innerEvent["note"].(string))
|
||||
}
|
||||
|
||||
checkRequiredCloudEventsFields(t, event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestBexprFilters tests that go-bexpr filters are used to filter events.
|
||||
func TestBexprFilters(t *testing.T) {
|
||||
core := vault.TestCoreWithConfig(t, &vault.CoreConfig{})
|
||||
ln, addr := TestServer(t, core)
|
||||
defer ln.Close()
|
||||
|
||||
// unseal the core
|
||||
keys, token := vault.TestCoreInit(t, core)
|
||||
for _, key := range keys {
|
||||
_, err := core.Unseal(key)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
sendEvents := func(ctx context.Context, eventTypes ...string) error {
|
||||
for _, eventType := range eventTypes {
|
||||
pluginInfo := &logical.EventPluginInfo{
|
||||
MountPath: "secret",
|
||||
}
|
||||
ns := namespace.RootNamespace
|
||||
id := eventType
|
||||
err := core.Events().SendEventInternal(namespace.RootContext(ctx), ns, pluginInfo, logical.EventType(eventType), &logical.EventData{
|
||||
Id: id,
|
||||
Metadata: nil,
|
||||
EntityIds: nil,
|
||||
Note: "testing",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
wsAddr := strings.Replace(addr, "http", "ws", 1)
|
||||
bexprFilter := url.QueryEscape("event_type == abc")
|
||||
|
||||
location := fmt.Sprintf("%s/v1/sys/events/subscribe/*?json=true&filter=%s", wsAddr, bexprFilter)
|
||||
conn, _, err := websocket.Dial(ctx, location, &websocket.DialOptions{
|
||||
HTTPHeader: http.Header{"x-vault-token": []string{token}},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close(websocket.StatusNormalClosure, "")
|
||||
|
||||
err = sendEvents(ctx, "abc", "def", "xyz")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// read until we time out
|
||||
seen := map[string]bool{}
|
||||
done := false
|
||||
for !done {
|
||||
done = func() bool {
|
||||
readCtx, readCancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer readCancel()
|
||||
_, msg, err := conn.Read(readCtx)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
event := map[string]interface{}{}
|
||||
err = json.Unmarshal(msg, &event)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return true
|
||||
}
|
||||
seen[event["id"].(string)] = true
|
||||
return false
|
||||
}()
|
||||
}
|
||||
// we should only get the "abc" messages
|
||||
assert.Len(t, seen, 1)
|
||||
assert.Contains(t, seen, "abc")
|
||||
}
|
||||
|
||||
func TestNamespacePrepend(t *testing.T) {
|
||||
testCases := []struct {
|
||||
requestNs string
|
||||
patterns []string
|
||||
result []string
|
||||
}{
|
||||
{"", []string{"ns*"}, []string{"", "ns*"}},
|
||||
{"ns1", []string{"ns*"}, []string{"ns1", "ns1/ns*"}},
|
||||
{"ns1", []string{"ns1*"}, []string{"ns1", "ns1/ns1*"}},
|
||||
{"ns1", []string{"ns1/*"}, []string{"ns1", "ns1/ns1/*"}},
|
||||
{"", []string{"ns1/ns13", "ns1/other"}, []string{"", "ns1/ns13", "ns1/other"}},
|
||||
{"ns1", []string{"ns1/ns13", "ns1/other"}, []string{"ns1", "ns1/ns1/ns13", "ns1/ns1/other"}},
|
||||
{"", []string{""}, []string{""}},
|
||||
{"", nil, []string{""}},
|
||||
{"ns1", []string{""}, []string{"ns1"}},
|
||||
{"ns1", []string{"", ""}, []string{"ns1"}},
|
||||
{"ns1", []string{"ns1"}, []string{"ns1", "ns1/ns1"}},
|
||||
{"", []string{"*"}, []string{"", "*"}},
|
||||
{"ns1", []string{"*"}, []string{"ns1", "ns1/*"}},
|
||||
{"", []string{"ns1/ns13*", "ns2"}, []string{"", "ns1/ns13*", "ns2"}},
|
||||
{"ns1", []string{"ns1/ns13*", "ns2"}, []string{"ns1", "ns1/ns1/ns13*", "ns1/ns2"}},
|
||||
{"", []string{"ns*", "ns1"}, []string{"", "ns*", "ns1"}},
|
||||
{"ns1", []string{"ns*", "ns1"}, []string{"ns1", "ns1/ns*", "ns1/ns1"}},
|
||||
{"ns1", []string{"ns1*", "ns1"}, []string{"ns1", "ns1/ns1*", "ns1/ns1"}},
|
||||
{"ns1", []string{"ns1/*", "ns1"}, []string{"ns1", "ns1/ns1/*", "ns1/ns1"}},
|
||||
}
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.requestNs+" "+strings.Join(testCase.patterns, " "), func(t *testing.T) {
|
||||
result := prependNamespacePatterns(testCase.patterns, &namespace.Namespace{ID: testCase.requestNs, Path: testCase.requestNs})
|
||||
assert.Equal(t, testCase.result, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func checkRequiredCloudEventsFields(t *testing.T, event map[string]interface{}) {
|
||||
t.Helper()
|
||||
for _, attr := range []string{"id", "source", "specversion", "type"} {
|
||||
if v, ok := event[attr]; !ok {
|
||||
t.Errorf("Missing attribute %s", attr)
|
||||
} else if str, ok := v.(string); !ok {
|
||||
t.Errorf("Expected %s to be string but got %T", attr, v)
|
||||
} else if str == "" {
|
||||
t.Errorf("%s was empty string", attr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestEventsSubscribeAuth tests that unauthenticated and unauthorized subscriptions
|
||||
// fail correctly.
|
||||
func TestEventsSubscribeAuth(t *testing.T) {
|
||||
core := vault.TestCore(t)
|
||||
ln, addr := TestServer(t, core)
|
||||
defer ln.Close()
|
||||
|
||||
// unseal the core
|
||||
keys, root := vault.TestCoreInit(t, core)
|
||||
for _, key := range keys {
|
||||
_, err := core.Unseal(key)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var nonPrivilegedToken string
|
||||
// Fetch a valid non privileged token.
|
||||
{
|
||||
config := api.DefaultConfig()
|
||||
config.Address = addr
|
||||
|
||||
client, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
client.SetToken(root)
|
||||
|
||||
secret, err := client.Auth().Token().Create(&api.TokenCreateRequest{Policies: []string{"default"}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if secret.Auth.ClientToken == "" {
|
||||
t.Fatal("Failed to fetch a non privileged token")
|
||||
}
|
||||
nonPrivilegedToken = secret.Auth.ClientToken
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
wsAddr := strings.Replace(addr, "http", "ws", 1)
|
||||
|
||||
// Get a 403 with no token.
|
||||
_, resp, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/abc", nil)
|
||||
if err == nil {
|
||||
t.Error("Expected websocket error but got none")
|
||||
}
|
||||
if resp == nil || resp.StatusCode != http.StatusForbidden {
|
||||
t.Errorf("Expected 403 but got %+v", resp)
|
||||
}
|
||||
|
||||
// Get a 403 with a non privileged token.
|
||||
_, resp, err = websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/abc", &websocket.DialOptions{
|
||||
HTTPHeader: http.Header{"x-vault-token": []string{nonPrivilegedToken}},
|
||||
})
|
||||
if err == nil {
|
||||
t.Error("Expected websocket error but got none")
|
||||
}
|
||||
if resp == nil || resp.StatusCode != http.StatusForbidden {
|
||||
t.Errorf("Expected 403 but got %+v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCanForwardEventConnections(t *testing.T) {
|
||||
// Run again with in-memory network
|
||||
inmemCluster, err := cluster.NewInmemLayerCluster("inmem-cluster", 3, hclog.New(&hclog.LoggerOptions{
|
||||
Mutex: &sync.Mutex{},
|
||||
Level: hclog.Trace,
|
||||
Name: "inmem-cluster",
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testCluster := vault.NewTestCluster(t, &vault.CoreConfig{
|
||||
AuditBackends: map[string]audit.Factory{
|
||||
"nop": corehelpers.NoopAuditFactory(nil),
|
||||
},
|
||||
}, &vault.TestClusterOptions{
|
||||
ClusterLayers: inmemCluster,
|
||||
})
|
||||
cores := testCluster.Cores
|
||||
testCluster.Start()
|
||||
defer testCluster.Cleanup()
|
||||
|
||||
rootToken := testCluster.RootToken
|
||||
|
||||
// Wait for core to become active
|
||||
vault.TestWaitActiveForwardingReady(t, cores[0].Core)
|
||||
|
||||
// Test forwarding a request. Since we're going directly from core to core
|
||||
// with no fallback we know that if it worked, request handling is working
|
||||
c := cores[1]
|
||||
standby, err := c.Standby()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !standby {
|
||||
t.Fatal("expected core to be standby")
|
||||
}
|
||||
|
||||
// We need to call Leader as that refreshes the connection info
|
||||
isLeader, _, _, err := c.Leader()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if isLeader {
|
||||
t.Fatal("core should not be leader")
|
||||
}
|
||||
corehelpers.RetryUntil(t, 5*time.Second, func() error {
|
||||
state := c.ActiveNodeReplicationState()
|
||||
if state == 0 {
|
||||
return fmt.Errorf("heartbeats have not yet returned a valid active node replication state: %d", state)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
req, err := http.NewRequest("GET", "https://pushit.real.good:9281/v1/sys/events/subscribe/xyz?json=true", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req = req.WithContext(namespace.RootContext(req.Context()))
|
||||
req.Header.Add(consts.AuthHeaderName, rootToken)
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
forwardRequest(cores[1].Core, resp, req)
|
||||
|
||||
header := resp.Header()
|
||||
if header == nil {
|
||||
t.Fatal("err: expected at least a Location header")
|
||||
}
|
||||
if !strings.HasPrefix(header.Get("Location"), "wss://") {
|
||||
t.Fatalf("bad location: %s", header.Get("Location"))
|
||||
}
|
||||
|
||||
// test forwarding requests to each core
|
||||
handled := 0
|
||||
forwarded := 0
|
||||
for _, c := range cores {
|
||||
resp := httptest.NewRecorder()
|
||||
fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handled++
|
||||
})
|
||||
handleRequestForwarding(c.Core, fakeHandler).ServeHTTP(resp, req)
|
||||
header := resp.Header()
|
||||
if header == nil {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(header.Get("Location"), "wss://") {
|
||||
forwarded++
|
||||
}
|
||||
}
|
||||
if handled != 1 && forwarded != 2 {
|
||||
t.Fatalf("Expected 1 core to handle the request and 2 to forward")
|
||||
}
|
||||
}
|
||||
@@ -367,9 +367,11 @@ func handleLogicalInternal(core *vault.Core, injectDataIntoTopLevel bool, noForw
|
||||
nsPath = ""
|
||||
}
|
||||
if strings.HasPrefix(r.URL.Path, fmt.Sprintf("/v1/%ssys/events/subscribe/", nsPath)) {
|
||||
handler := handleEventsSubscribe(core, req)
|
||||
handler.ServeHTTP(w, r)
|
||||
return
|
||||
handler := entHandleEventsSubscribe(core, req)
|
||||
if handler != nil {
|
||||
handler.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
}
|
||||
handler := handleEntPaths(nsPath, core, r)
|
||||
if handler != nil {
|
||||
|
||||
@@ -7,6 +7,8 @@ description: |-
|
||||
|
||||
# events
|
||||
|
||||
<EnterpriseAlert product="vault" />
|
||||
|
||||
Use the `events` command to get a real-time display of
|
||||
[event notifications](/vault/docs/concepts/events) generated by Vault and to subscribe to Vault
|
||||
event notifications. Note that the `events subscribe` runs indefinitly and will not exit on
|
||||
@@ -55,6 +57,23 @@ flags](/vault/docs/commands) included on all commands.
|
||||
- `-timeout`: `(duration: "")` - close the WebSocket automatically after the
|
||||
specified duration.
|
||||
|
||||
- `-namespaces` `(string)` - Additional **child** namespaces for the
|
||||
subscription. Repeat the flag to add additional namespace patterns to the
|
||||
subscription request. Vault automatically prepends the issuing namespace for
|
||||
the request to the provided namespace. For example, if you include
|
||||
`-namespaces=ns2` on a request made in the `ns1` namespace, Vault will attempt
|
||||
to subscribe you to event notifications under the `ns1/ns2` and `ns1` namespaces. You can
|
||||
use the `*` character to include wildcards in the namespace pattern. By
|
||||
default, Vault will only subscribe to event notifications in the requesting namespace.
|
||||
|
||||
<Note>
|
||||
To subscribe to event notifications across multiple namespaces, you must provide a root
|
||||
token or a token associated with appropriate policies across all the targeted
|
||||
namespaces. Refer to
|
||||
the <a href="/vault/tutorials/enterprise/namespaces">Secure multi-tenancy with
|
||||
namespaces</a>tutorial for configuring your Vault instance appropriately.
|
||||
</Note>
|
||||
|
||||
- `-filter` `(string: "")` - Filter expression used to select event notifications to be sent
|
||||
through the WebSocket.
|
||||
|
||||
@@ -81,25 +100,3 @@ list of filtering options and an explanation on how Vault evaluates filter expre
|
||||
```
|
||||
data_path == secret/data/foo and operation != write
|
||||
```
|
||||
|
||||
|
||||
### Enterprise options
|
||||
|
||||
<EnterpriseAlert product="vault" />
|
||||
|
||||
- `-namespaces` `(string)` - Additional **child** namespaces for the
|
||||
subscription. Repeat the flag to add additional namespace patterns to the
|
||||
subscription request. Vault automatically prepends the issuing namespace for
|
||||
the request to the provided namespace. For example, if you include
|
||||
`-namespaces=ns2` on a request made in the `ns1` namespace, Vault will attempt
|
||||
to subscribe you to event notifications under the `ns1/ns2` and `ns1` namespaces. You can
|
||||
use the `*` character to include wildcards in the namespace pattern. By
|
||||
default, Vault will only subscribe to event notifications in the requesting namespace.
|
||||
|
||||
<Note>
|
||||
To subscribe to event notifications across multiple namespaces, you must provide a root
|
||||
token or a token associated with appropriate policies across all the targeted
|
||||
namespaces. Refer to
|
||||
the <a href="/vault/tutorials/enterprise/namespaces">Secure multi-tenancy with
|
||||
namespaces</a>tutorial for configuring your Vault instance appropriately.
|
||||
</Note>
|
||||
|
||||
@@ -8,6 +8,8 @@ description: >-
|
||||
|
||||
# Event Notifications
|
||||
|
||||
<EnterpriseAlert product="vault" />
|
||||
|
||||
Event notifications are arbitrary, **non-secret** data that can be exchanged between producers (Vault and plugins)
|
||||
and subscribers (Vault components and external users via the API).
|
||||
|
||||
|
||||
@@ -281,7 +281,12 @@
|
||||
},
|
||||
{
|
||||
"title": "Events",
|
||||
"path": "concepts/events"
|
||||
"path": "concepts/events",
|
||||
"badge": {
|
||||
"text": "ENTERPRISE",
|
||||
"type": "outlined",
|
||||
"color": "highlight"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Filtering",
|
||||
|
||||
Reference in New Issue
Block a user