events: Add support for multi-namespace subscriptions (#22540)

Events from multiple namespaces can be subscribed to via
glob patterns passed to the subscription.

This does not do policy enforcement yet -- that will come in PR soon.

I tested this manually as well by pulling it into Vault Enterprise
so I could create namespaces and check that subscriptions work as
expected.

Co-authored-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
Christopher Swenson
2023-08-25 14:04:45 -07:00
committed by GitHub
parent f143f6a070
commit 3e900fdda1
5 changed files with 244 additions and 21 deletions

3
changelog/22540.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
events: Allow subscriptions to multiple namespaces
```

View File

@@ -23,6 +23,8 @@ var (
type EventsSubscribeCommands struct {
*BaseCommand
namespaces []string
}
func (c *EventsSubscribeCommands) Synopsis() string {
@@ -31,10 +33,11 @@ func (c *EventsSubscribeCommands) Synopsis() string {
func (c *EventsSubscribeCommands) Help() string {
helpText := `
Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType
Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType
Subscribe to events of the given event type (topic). The events will be
output to standard out.
Subscribe to events of the given event type (topic), which may be a glob
pattern (with "*"" treated as a wildcard). The events will be sent to
standard out.
The output will be a JSON object serialized using the default protobuf
JSON serialization format, with one line per event received.
@@ -44,7 +47,19 @@ Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType
func (c *EventsSubscribeCommands) Flags() *FlagSets {
set := c.flagSet(FlagSetHTTP)
f := set.NewFlagSet("Subscribe Options")
f.StringSliceVar(&StringSliceVar{
Name: "namespaces",
Usage: `Specifies one or more patterns of additional child namespaces
to subscribe to. The namespace of the request is automatically
prepended, so specifying 'ns2' when the request is in the 'ns1'
namespace will result in subscribing to 'ns1/ns2', in addition to
'ns1'. Patterns can include "*" characters to indicate
wildcards. The default is to subscribe only to the request's
namespace.`,
Default: []string{},
Target: &c.namespaces,
})
return set
}
@@ -88,6 +103,22 @@ func (c *EventsSubscribeCommands) Run(args []string) int {
return 0
}
// cleanNamespace removes leading and trailing space and /'s from the namespace path.
func cleanNamespace(ns string) string {
ns = strings.TrimSpace(ns)
ns = strings.Trim(ns, "/")
return ns
}
// cleanNamespaces removes leading and trailing space and /'s from the namespace paths.
func cleanNamespaces(namespaces []string) []string {
cleaned := make([]string, len(namespaces))
for i, ns := range namespaces {
cleaned[i] = cleanNamespace(ns)
}
return cleaned
}
func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path string) error {
r := client.NewRequest("GET", "/v1/"+path)
u := r.URL
@@ -98,9 +129,12 @@ func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path stri
}
q := u.Query()
q.Set("json", "true")
if len(c.namespaces) > 0 {
q["namespaces"] = cleanNamespaces(c.namespaces)
}
u.RawQuery = q.Encode()
client.AddHeader("X-Vault-Token", client.Token())
client.AddHeader("X-Vault-Namesapce", client.Namespace())
client.AddHeader("X-Vault-Namespace", client.Namespace())
ctx := context.Background()
// Follow redirects in case our request if our request is forwarded to the leader.

View File

@@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/http"
"path"
"strconv"
"strings"
"time"
@@ -22,13 +23,13 @@ import (
)
type eventSubscribeArgs struct {
ctx context.Context
logger hclog.Logger
events *eventbus.EventBus
ns *namespace.Namespace
pattern string
conn *websocket.Conn
json bool
ctx context.Context
logger hclog.Logger
events *eventbus.EventBus
namespacePatterns []string
pattern string
conn *websocket.Conn
json bool
}
// handleEventsSubscribeWebsocket runs forever, returning a websocket error code and reason
@@ -36,7 +37,7 @@ type eventSubscribeArgs struct {
func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCode, string, error) {
ctx := args.ctx
logger := args.logger
ch, cancel, err := args.events.Subscribe(ctx, args.ns, args.pattern)
ch, cancel, err := args.events.SubscribeMultipleNamespaces(ctx, args.namespacePatterns, args.pattern)
if err != nil {
logger.Info("Error subscribing", "error", err)
return websocket.StatusUnsupportedData, "Error subscribing", nil
@@ -123,6 +124,8 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
}
}
namespacePatterns := r.URL.Query()["namespaces"]
namespacePatterns = prependNamespacePatterns(namespacePatterns, ns)
conn, err := websocket.Accept(w, r, nil)
if err != nil {
logger.Info("Could not accept as websocket", "error", err)
@@ -143,7 +146,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
}
}()
closeStatus, closeReason, err := handleEventsSubscribeWebsocket(eventSubscribeArgs{ctx, logger, core.Events(), ns, pattern, conn, json})
closeStatus, closeReason, err := handleEventsSubscribeWebsocket(eventSubscribeArgs{ctx, logger, core.Events(), namespacePatterns, pattern, conn, json})
if err != nil {
closeStatus = websocket.CloseStatus(err)
if closeStatus == -1 {
@@ -163,3 +166,18 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
}
})
}
// prependNamespacePatterns prepends the request namespace to the namespace patterns,
// and also adds the request namespace to the list.
func prependNamespacePatterns(patterns []string, requestNamespace *namespace.Namespace) []string {
prepend := strings.Trim(requestNamespace.Path, "/")
newPatterns := make([]string, 0, len(patterns)+1)
newPatterns = append(newPatterns, prepend)
for _, pattern := range patterns {
if strings.Trim(strings.TrimSpace(pattern), "/") == "" {
continue
}
newPatterns = append(newPatterns, path.Join(prepend, pattern, "/"))
}
return newPatterns
}

View File

@@ -26,6 +26,7 @@ import (
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
"github.com/hashicorp/vault/vault/cluster"
"github.com/stretchr/testify/assert"
"nhooyr.io/websocket"
)
@@ -134,6 +135,161 @@ func TestEventsSubscribe(t *testing.T) {
}
}
// TestEventsSubscribeNamespaces tests the websocket endpoint for subscribing to events in multiple namespaces.
func TestEventsSubscribeNamespaces(t *testing.T) {
core := vault.TestCoreWithConfig(t, &vault.CoreConfig{
Experiments: []string{experiments.VaultExperimentEventsAlpha1},
})
ln, addr := TestServer(t, core)
defer ln.Close()
// unseal the core
keys, token := vault.TestCoreInit(t, core)
for _, key := range keys {
_, err := core.Unseal(key)
if err != nil {
t.Fatal(err)
}
}
stop := atomic.Bool{}
const eventType = "abc"
namespaces := []string{
"",
"ns1",
"ns2",
"ns1/ns13",
"ns1/ns13/ns134",
}
// send some events with the specified namespaces
sendEvents := func() {
pluginInfo := &logical.EventPluginInfo{
MountPath: "secret",
}
for i := range namespaces {
var ns *namespace.Namespace
if namespaces[i] == "" {
ns = namespace.RootNamespace
} else {
ns = &namespace.Namespace{
ID: namespaces[i],
Path: namespaces[i],
CustomMetadata: nil,
}
}
id, err := uuid.GenerateUUID()
if err != nil {
core.Logger().Info("Error generating UUID, exiting sender", "error", err)
}
err = core.Events().SendEventInternal(namespace.RootContext(context.Background()), ns, pluginInfo, eventType, &logical.EventData{
Id: id,
Metadata: nil,
EntityIds: nil,
Note: "testing",
})
if err != nil {
core.Logger().Info("Error sending event, exiting sender", "error", err)
}
}
}
t.Cleanup(func() {
stop.Store(true)
})
ctx := context.Background()
wsAddr := strings.Replace(addr, "http", "ws", 1)
testCases := []struct {
name string
namespaces []string
expectedEvents int
}{
{"invalid", []string{"something"}, 1},
{"simple wildcard", []string{"ns*"}, 5},
{"two namespaces", []string{"ns1/ns13", "ns1/other"}, 2},
{"no namespace", []string{""}, 1},
{"all wildcard", []string{"*"}, 5},
{"mixed wildcard", []string{"ns1/ns13*", "ns2"}, 4},
{"overlapping wildcard", []string{"ns*", "ns1"}, 5},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
extra := ""
for _, ns := range testCase.namespaces {
extra += "&namespaces=" + ns
}
url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?json=true%v", wsAddr, eventType, extra)
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
conn.Close(websocket.StatusNormalClosure, "")
})
sendEvents()
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
gotEvents := 0
for {
_, msg, err := conn.Read(ctx)
if err != nil {
break
}
event := map[string]interface{}{}
err = json.Unmarshal(msg, &event)
if err != nil {
t.Fatal(err)
}
t.Log(string(msg))
gotEvents += 1
}
assert.Equal(t, testCase.expectedEvents, gotEvents)
})
}
}
func TestNamespacePrepend(t *testing.T) {
testCases := []struct {
requestNs string
patterns []string
result []string
}{
{"", []string{"ns*"}, []string{"", "ns*"}},
{"ns1", []string{"ns*"}, []string{"ns1", "ns1/ns*"}},
{"ns1", []string{"ns1*"}, []string{"ns1", "ns1/ns1*"}},
{"ns1", []string{"ns1/*"}, []string{"ns1", "ns1/ns1/*"}},
{"", []string{"ns1/ns13", "ns1/other"}, []string{"", "ns1/ns13", "ns1/other"}},
{"ns1", []string{"ns1/ns13", "ns1/other"}, []string{"ns1", "ns1/ns1/ns13", "ns1/ns1/other"}},
{"", []string{""}, []string{""}},
{"", nil, []string{""}},
{"ns1", []string{""}, []string{"ns1"}},
{"ns1", []string{"", ""}, []string{"ns1"}},
{"ns1", []string{"ns1"}, []string{"ns1", "ns1/ns1"}},
{"", []string{"*"}, []string{"", "*"}},
{"ns1", []string{"*"}, []string{"ns1", "ns1/*"}},
{"", []string{"ns1/ns13*", "ns2"}, []string{"", "ns1/ns13*", "ns2"}},
{"ns1", []string{"ns1/ns13*", "ns2"}, []string{"ns1", "ns1/ns1/ns13*", "ns1/ns2"}},
{"", []string{"ns*", "ns1"}, []string{"", "ns*", "ns1"}},
{"ns1", []string{"ns*", "ns1"}, []string{"ns1", "ns1/ns*", "ns1/ns1"}},
{"ns1", []string{"ns1*", "ns1"}, []string{"ns1", "ns1/ns1*", "ns1/ns1"}},
{"ns1", []string{"ns1/*", "ns1"}, []string{"ns1", "ns1/ns1/*", "ns1/ns1"}},
}
for _, testCase := range testCases {
t.Run(testCase.requestNs+" "+strings.Join(testCase.patterns, " "), func(t *testing.T) {
result := prependNamespacePatterns(testCase.patterns, &namespace.Namespace{ID: testCase.requestNs, Path: testCase.requestNs})
assert.Equal(t, testCase.result, result)
})
}
}
func checkRequiredCloudEventsFields(t *testing.T, event map[string]interface{}) {
t.Helper()
for _, attr := range []string{"id", "source", "specversion", "type"} {

View File

@@ -202,6 +202,10 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
}
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.SubscribeMultipleNamespaces(ctx, []string{ns.Path}, pattern)
}
func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
@@ -213,7 +217,7 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
return nil, nil, err
}
filterNode := newFilterNode(ns, pattern)
filterNode := newFilterNode(namespacePathPatterns, pattern)
err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode)
if err != nil {
return nil, nil, err
@@ -259,15 +263,23 @@ func (bus *EventBus) SetSendTimeout(timeout time.Duration) {
bus.timeout = timeout
}
func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter {
func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filter {
return &eventlogger.Filter{
Predicate: func(e *eventlogger.Event) (bool, error) {
eventRecv := e.Payload.(*logical.EventReceived)
// Drop if event is not in our namespace.
// TODO: add wildcard/child namespace processing here in some cases?
if eventRecv.Namespace != ns.Path {
return false, nil
eventNs := strings.Trim(eventRecv.Namespace, "/")
// Drop if event is not in namespace patterns namespace.
if len(namespacePatterns) > 0 {
allow := false
for _, nsPattern := range namespacePatterns {
if glob.Glob(nsPattern, eventNs) {
allow = true
break
}
}
if !allow {
return false, nil
}
}
// Filter for correct event type, including wildcards.