events: WS protobuf messages should be binary (#19232) (#19256)

The [WebSockets spec](https://www.rfc-editor.org/rfc/rfc6455) states
that text messages must be valid UTF-8 encoded strings, which protobuf
messages virtually never are. This now correctly sends the protobuf events
as binary messages.

We change the format to correspond to CloudEvents, as originally intended,
and remove a redundant timestamp and newline.

We also bump the eventlogger to fix a race condition that this code triggers.

Co-authored-by: Christopher Swenson <christopher.swenson@hashicorp.com>
This commit is contained in:
hc-github-team-secure-vault-core
2023-02-17 15:14:48 -05:00
committed by GitHub
parent 1ec3397899
commit c9eb3c7251
10 changed files with 119 additions and 120 deletions

2
go.mod
View File

@@ -66,7 +66,7 @@ require (
github.com/hashicorp/consul-template v0.29.5
github.com/hashicorp/consul/api v1.17.0
github.com/hashicorp/errwrap v1.1.0
github.com/hashicorp/eventlogger v0.1.0
github.com/hashicorp/eventlogger v0.1.1
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192
github.com/hashicorp/go-gcp-common v0.8.0

4
go.sum
View File

@@ -969,8 +969,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/eventlogger v0.1.0 h1:S6xc4gZVzewuDUP4R4Ngko419h/CGDuV/b4ADL3XLik=
github.com/hashicorp/eventlogger v0.1.0/go.mod h1:a3IXf1aEJfpCPzseTOrwKj4fVW/Qn3oEmpQeaIznzH0=
github.com/hashicorp/eventlogger v0.1.1 h1:zyCjxsy7KunFsMPZKU5PnwWEakSrp1zjj2vPFmrDaeo=
github.com/hashicorp/eventlogger v0.1.1/go.mod h1://CHt6/j+Q2lc0NlUB5af4aS2M0c0aVBg9/JfcpAyhM=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=

View File

@@ -15,7 +15,6 @@ import (
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
"github.com/hashicorp/vault/vault/eventbus"
"google.golang.org/protobuf/encoding/protojson"
"nhooyr.io/websocket"
)
@@ -47,19 +46,26 @@ func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCo
logger.Info("Websocket context is done, closing the connection")
return websocket.StatusNormalClosure, "", nil
case message := <-ch:
logger.Debug("Sending message to websocket", "message", message)
logger.Debug("Sending message to websocket", "message", message.Payload)
var messageBytes []byte
var messageType websocket.MessageType
if args.json {
messageBytes, err = protojson.Marshal(message)
var ok bool
messageBytes, ok = message.Format("cloudevents-json")
if !ok {
logger.Warn("Could not get cloudevents JSON format")
return 0, "", errors.New("could not get cloudevents JSON format")
}
messageType = websocket.MessageText
} else {
messageBytes, err = proto.Marshal(message)
messageBytes, err = proto.Marshal(message.Payload.(*logical.EventReceived))
messageType = websocket.MessageBinary
}
if err != nil {
logger.Warn("Could not serialize websocket event", "error", err)
return 0, "", err
}
messageString := string(messageBytes) + "\n"
err = args.conn.Write(ctx, websocket.MessageText, []byte(messageString))
err = args.conn.Write(ctx, messageType, messageBytes)
if err != nil {
return 0, "", err
}

View File

@@ -2,6 +2,7 @@ package http
import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
@@ -64,27 +65,39 @@ func TestEventsSubscribe(t *testing.T) {
wsAddr := strings.Replace(addr, "http", "ws", 1)
// check that the connection fails if we don't have a token
_, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", nil)
if err == nil {
t.Error("Expected websocket error but got none")
} else if !strings.HasSuffix(err.Error(), "401") {
t.Errorf("Expected 401 websocket but got %v", err)
}
testCases := []struct {
json bool
}{{true}, {false}}
conn, _, err := websocket.Dial(ctx, wsAddr+"/v1/sys/events/subscribe/"+eventType+"?json=true", &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
for _, testCase := range testCases {
url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?json=%v", wsAddr, eventType, testCase.json)
// check that the connection fails if we don't have a token
_, _, err := websocket.Dial(ctx, url, nil)
if err == nil {
t.Error("Expected websocket error but got none")
} else if !strings.HasSuffix(err.Error(), "401") {
t.Errorf("Expected 401 websocket but got %v", err)
}
_, msg, err := conn.Read(ctx)
if err != nil {
t.Fatal(err)
}
msgJson := strings.TrimSpace(string(msg))
if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") {
t.Errorf("Expected to get JSON event but got: %v", msgJson)
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
conn.Close(websocket.StatusNormalClosure, "")
})
_, msg, err := conn.Read(ctx)
if err != nil {
t.Fatal(err)
}
if testCase.json {
msgJson := strings.TrimSpace(string(msg))
if !strings.HasPrefix(msgJson, "{") || !strings.HasSuffix(msgJson, "}") {
t.Errorf("Expected to get JSON event but got: %v", msgJson)
}
}
}
}

View File

@@ -10,7 +10,6 @@ import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
structpb "google.golang.org/protobuf/types/known/structpb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
@@ -207,10 +206,9 @@ type EventReceived struct {
Event *EventData `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"`
// namespace path
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"`
Timestamp *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
EventType string `protobuf:"bytes,3,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
PluginInfo *EventPluginInfo `protobuf:"bytes,4,opt,name=plugin_info,json=pluginInfo,proto3" json:"plugin_info,omitempty"`
}
func (x *EventReceived) Reset() {
@@ -273,13 +271,6 @@ func (x *EventReceived) GetPluginInfo() *EventPluginInfo {
return nil
}
func (x *EventReceived) GetTimestamp() *timestamppb.Timestamp {
if x != nil {
return x.Timestamp
}
return nil
}
var File_sdk_logical_event_proto protoreflect.FileDescriptor
var file_sdk_logical_event_proto_rawDesc = []byte{
@@ -287,48 +278,42 @@ var file_sdk_logical_event_proto_rawDesc = []byte{
0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6c, 0x6f, 0x67, 0x69, 0x63,
0x61, 0x6c, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69,
0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x63,
0x6c, 0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, 0x75, 0x6e,
0x74, 0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f,
0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, 0x1d, 0x0a,
0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06,
0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c,
0x75, 0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x76,
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x6c,
0x75, 0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x76,
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65,
0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x83, 0x01, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44,
0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x02, 0x69, 0x64, 0x12, 0x33, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x08,
0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69,
0x74, 0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e,
0x74, 0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18,
0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x22, 0xeb, 0x01, 0x0a, 0x0d,
0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a,
0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c,
0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61,
0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73,
0x70, 0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65,
0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74,
0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74,
0x54, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69,
0x6e, 0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69,
0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49,
0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12,
0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09,
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74,
0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72,
0x70, 0x2f, 0x76, 0x61, 0x75, 0x6c, 0x74, 0x2f, 0x73, 0x64, 0x6b, 0x2f, 0x6c, 0x6f, 0x67, 0x69,
0x63, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x22, 0xd1, 0x01, 0x0a, 0x0f, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e,
0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x63, 0x6c,
0x61, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, 0x6f, 0x75, 0x6e, 0x74,
0x43, 0x6c, 0x61, 0x73, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x61,
0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6d,
0x6f, 0x75, 0x6e, 0x74, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x12, 0x1d, 0x0a, 0x0a,
0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x09, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70,
0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, 0x75,
0x67, 0x69, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x76, 0x65,
0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x6c, 0x75,
0x67, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65,
0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72,
0x73, 0x69, 0x6f, 0x6e, 0x22, 0x83, 0x01, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61,
0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x69, 0x64, 0x12, 0x33, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x08, 0x6d,
0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x69, 0x74,
0x79, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6e, 0x74,
0x69, 0x74, 0x79, 0x49, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x74, 0x65, 0x22, 0xb1, 0x01, 0x0a, 0x0d, 0x45,
0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x12, 0x28, 0x0a, 0x05,
0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6c, 0x6f,
0x67, 0x69, 0x63, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x44, 0x61, 0x74, 0x61, 0x52,
0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70,
0x61, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73,
0x70, 0x61, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79,
0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54,
0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x5f, 0x69, 0x6e,
0x66, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6c, 0x6f, 0x67, 0x69, 0x63,
0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e,
0x66, 0x6f, 0x52, 0x0a, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x42, 0x28,
0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73,
0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x76, 0x61, 0x75, 0x6c, 0x74, 0x2f, 0x73, 0x64, 0x6b,
0x2f, 0x6c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -345,22 +330,20 @@ func file_sdk_logical_event_proto_rawDescGZIP() []byte {
var file_sdk_logical_event_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_sdk_logical_event_proto_goTypes = []interface{}{
(*EventPluginInfo)(nil), // 0: logical.EventPluginInfo
(*EventData)(nil), // 1: logical.EventData
(*EventReceived)(nil), // 2: logical.EventReceived
(*structpb.Struct)(nil), // 3: google.protobuf.Struct
(*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp
(*EventPluginInfo)(nil), // 0: logical.EventPluginInfo
(*EventData)(nil), // 1: logical.EventData
(*EventReceived)(nil), // 2: logical.EventReceived
(*structpb.Struct)(nil), // 3: google.protobuf.Struct
}
var file_sdk_logical_event_proto_depIdxs = []int32{
3, // 0: logical.EventData.metadata:type_name -> google.protobuf.Struct
1, // 1: logical.EventReceived.event:type_name -> logical.EventData
0, // 2: logical.EventReceived.plugin_info:type_name -> logical.EventPluginInfo
4, // 3: logical.EventReceived.timestamp:type_name -> google.protobuf.Timestamp
4, // [4:4] is the sub-list for method output_type
4, // [4:4] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
3, // [3:3] is the sub-list for method output_type
3, // [3:3] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
}
func init() { file_sdk_logical_event_proto_init() }

View File

@@ -5,7 +5,6 @@ option go_package = "github.com/hashicorp/vault/sdk/logical";
package logical;
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
// EventPluginInfo contains data related to the plugin that generated an event.
message EventPluginInfo {
@@ -49,5 +48,4 @@ message EventReceived {
string namespace = 2;
string event_type = 3;
EventPluginInfo plugin_info = 4;
google.protobuf.Timestamp timestamp = 5;
}

View File

@@ -7,8 +7,8 @@ import (
)
// ID is an alias to GetId() for CloudEvents compatibility.
func (x *EventData) ID() string {
return x.GetId()
func (x *EventReceived) ID() string {
return x.Event.GetId()
}
// NewEvent returns an event with a new, random EID.

View File

@@ -17,7 +17,6 @@ import (
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
"github.com/ryanuber/go-glob"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
@@ -53,7 +52,7 @@ type pluginEventBus struct {
type asyncChanNode struct {
// TODO: add bounded deque buffer of *EventReceived
ctx context.Context
ch chan *logical.EventReceived
ch chan *eventlogger.Event
logger hclog.Logger
// used to close the connection
@@ -95,7 +94,6 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace,
Namespace: ns.Path,
EventType: string(eventType),
PluginInfo: pluginInfo,
Timestamp: timestamppb.New(time.Now()),
}
bus.logger.Info("Sending event", "event", eventReceived)
@@ -169,7 +167,7 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
}, nil
}
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *logical.EventReceived, context.CancelFunc, error) {
func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
@@ -193,7 +191,7 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
}
ctx, cancel := context.WithCancel(ctx)
asyncNode := newAsyncNode(ctx, ns, bus.logger)
asyncNode := newAsyncNode(ctx, bus.logger)
err = bus.broker.RegisterNode(eventlogger.NodeID(sinkNodeID), asyncNode)
if err != nil {
defer cancel()
@@ -247,10 +245,10 @@ func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter
}
}
func newAsyncNode(ctx context.Context, namespace *namespace.Namespace, logger hclog.Logger) *asyncChanNode {
func newAsyncNode(ctx context.Context, logger hclog.Logger) *asyncChanNode {
return &asyncChanNode{
ctx: ctx,
ch: make(chan *logical.EventReceived),
ch: make(chan *eventlogger.Event),
logger: logger,
}
}
@@ -272,17 +270,16 @@ func (node *asyncChanNode) Close() {
func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
// sends to the channel async in another goroutine
go func() {
eventRecv := e.Payload.(*logical.EventReceived)
var timeout bool
select {
case node.ch <- eventRecv:
case node.ch <- e:
case <-ctx.Done():
timeout = errors.Is(ctx.Err(), context.DeadlineExceeded)
case <-node.ctx.Done():
timeout = errors.Is(node.ctx.Err(), context.DeadlineExceeded)
}
if timeout {
node.logger.Info("Subscriber took too long to process event, closing", "ID", eventRecv.Event.ID())
node.logger.Info("Subscriber took too long to process event, closing", "ID", e.Payload.(*logical.EventReceived).Event.Id)
node.Close()
}
}()

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
@@ -58,7 +59,7 @@ func TestBusBasics(t *testing.T) {
timeout := time.After(1 * time.Second)
select {
case message := <-ch:
if message.Event.ID() != event.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event.Id {
t.Errorf("Got unexpected message: %+v", message)
}
case <-timeout:
@@ -117,7 +118,7 @@ func TestNamespaceFiltering(t *testing.T) {
timeout = time.After(1 * time.Second)
select {
case message := <-ch:
if message.Event.ID() != event.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event.Id {
t.Errorf("Got unexpected message %+v but was waiting for %+v", message, event)
}
@@ -171,7 +172,7 @@ func TestBus2Subscriptions(t *testing.T) {
timeout := time.After(1 * time.Second)
select {
case message := <-ch1:
if message.Event.ID() != event1.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event1.Id {
t.Errorf("Got unexpected message: %v", message)
}
case <-timeout:
@@ -179,7 +180,7 @@ func TestBus2Subscriptions(t *testing.T) {
}
select {
case message := <-ch2:
if message.Event.ID() != event2.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event2.Id {
t.Errorf("Got unexpected message: %v", message)
}
case <-timeout:
@@ -216,7 +217,7 @@ func TestBusSubscriptionsCancel(t *testing.T) {
eventType := logical.EventType("someType")
var channels []<-chan *logical.EventReceived
var channels []<-chan *eventlogger.Event
var cancels []context.CancelFunc
stopped := atomic.Int32{}
@@ -348,7 +349,7 @@ func TestBusWildcardSubscriptions(t *testing.T) {
for i := 0; i < 2; i++ {
select {
case message := <-ch1:
ch1Seen = append(ch1Seen, message.Event.ID())
ch1Seen = append(ch1Seen, message.Payload.(*logical.EventReceived).Event.Id)
case <-timeout:
t.Error("Timeout waiting for event1")
}
@@ -356,17 +357,17 @@ func TestBusWildcardSubscriptions(t *testing.T) {
if len(ch1Seen) != 2 {
t.Errorf("Expected 2 events but got: %v", ch1Seen)
} else {
if !strutil.StrListContains(ch1Seen, event1.ID()) {
t.Errorf("Did not find %s event1 ID in ch1seen", event1.ID())
if !strutil.StrListContains(ch1Seen, event1.Id) {
t.Errorf("Did not find %s event1 ID in ch1seen", event1.Id)
}
if !strutil.StrListContains(ch1Seen, event2.ID()) {
t.Errorf("Did not find %s event2 ID in ch1seen", event2.ID())
if !strutil.StrListContains(ch1Seen, event2.Id) {
t.Errorf("Did not find %s event2 ID in ch1seen", event2.Id)
}
}
// Expect to receive just kv/bar on ch2, which subscribed to */bar
select {
case message := <-ch2:
if message.Event.ID() != event2.ID() {
if message.Payload.(*logical.EventReceived).Event.Id != event2.Id {
t.Errorf("Got unexpected message: %v", message)
}
case <-timeout:

View File

@@ -37,7 +37,8 @@ func TestCanSendEventsFromBuiltinPlugin(t *testing.T) {
// check that the event is routed to the subscription
select {
case received := <-ch:
case receivedEvent := <-ch:
received := receivedEvent.Payload.(*logical.EventReceived)
if event.Id != received.Event.Id {
t.Errorf("Got wrong event: %+v, expected %+v", received, event)
}