gRPC Backend Plugins (#3808)

* Add grpc plugins

* Add grpc plugins

* Translate wrap info to/from proto

* Add nil checks

* Fix nil marshaling errors

* Provide logging through the go-plugin logger

* handle errors in the messages

* Update the TLS config so bidirectional connections work

* Add connectivity checks

* Restart plugin and add timeouts where context is not availible

* Add the response wrap data into the grpc system implementation

* Add leaseoptions to pb.Auth

* Add an error translator

* Add tests for translating the proto objects

* Fix rename of function

* Add tracing to plugins for easier debugging

* Handle plugin crashes with the go-plugin context

* Add test for grpcStorage

* Add tests for backend and system

* Bump go-plugin for GRPCBroker

* Remove RegisterLicense

* Add casing translations for new proto messages

* Use doneCtx in grpcClient

* Use doneCtx in grpcClient

* s/shutdown/shut down/
This commit is contained in:
Brian Kassouf
2018-01-18 13:49:20 -08:00
committed by GitHub
parent 94878e558d
commit 03f6108822
35 changed files with 5988 additions and 78 deletions

View File

@@ -95,7 +95,8 @@ proto:
protoc -I physical physical/types.proto --go_out=plugins=grpc:physical
protoc -I helper/identity -I ../../.. helper/identity/types.proto --go_out=plugins=grpc:helper/identity
protoc builtin/logical/database/dbplugin/*.proto --go_out=plugins=grpc:.
sed -i -e 's/Idp/IDP/' -e 's/Url/URL/' -e 's/Id/ID/' -e 's/EntityId/EntityID/' -e 's/Api/API/' -e 's/Qr/QR/' -e 's/protobuf:"/sentinel:"" protobuf:"/' helper/identity/types.pb.go helper/storagepacker/types.pb.go
protoc logical/plugin/pb/*.proto --go_out=plugins=grpc:.
sed -i -e 's/Idp/IDP/' -e 's/Url/URL/' -e 's/Id/ID/' -e 's/EntityId/EntityID/' -e 's/Api/API/' -e 's/Qr/QR/' -e 's/protobuf:"/sentinel:"" protobuf:"/' helper/identity/types.pb.go helper/storagepacker/types.pb.go logical/plugin/pb/backend.pb.go
sed -i -e 's/Iv/IV/' -e 's/Hmac/HMAC/' physical/types.pb.go
fmtcheck:

View File

@@ -7,9 +7,9 @@ import (
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/vault/helper/pluginutil"
)
var (
@@ -83,20 +83,12 @@ func (s *gRPCServer) Close(_ context.Context, _ *Empty) (*Empty, error) {
type gRPCClient struct {
client DatabaseClient
clientConn *grpc.ClientConn
doneCtx context.Context
}
func (c gRPCClient) Type() (string, error) {
// If the plugin has already shutdown, this will hang forever so we give it
// a one second timeout.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
switch c.clientConn.GetState() {
case connectivity.Ready, connectivity.Idle:
default:
return "", ErrPluginShutdown
}
resp, err := c.client.Type(ctx, &Empty{})
resp, err := c.client.Type(c.doneCtx, &Empty{})
if err != nil {
return "", err
}
@@ -110,11 +102,10 @@ func (c gRPCClient) CreateUser(ctx context.Context, statements Statements, usern
return "", "", err
}
switch c.clientConn.GetState() {
case connectivity.Ready, connectivity.Idle:
default:
return "", "", ErrPluginShutdown
}
ctx, cancel := context.WithCancel(ctx)
quitCh := pluginutil.CtxCancelIfCanceled(cancel, c.doneCtx)
defer close(quitCh)
defer cancel()
resp, err := c.client.CreateUser(ctx, &CreateUserRequest{
Statements: &statements,
@@ -122,6 +113,10 @@ func (c gRPCClient) CreateUser(ctx context.Context, statements Statements, usern
Expiration: t,
})
if err != nil {
if c.doneCtx.Err() != nil {
return "", "", ErrPluginShutdown
}
return "", "", err
}
@@ -134,33 +129,47 @@ func (c *gRPCClient) RenewUser(ctx context.Context, statements Statements, usern
return err
}
switch c.clientConn.GetState() {
case connectivity.Ready, connectivity.Idle:
default:
return ErrPluginShutdown
}
ctx, cancel := context.WithCancel(ctx)
quitCh := pluginutil.CtxCancelIfCanceled(cancel, c.doneCtx)
defer close(quitCh)
defer cancel()
_, err = c.client.RenewUser(ctx, &RenewUserRequest{
Statements: &statements,
Username: username,
Expiration: t,
})
if err != nil {
if c.doneCtx.Err() != nil {
return ErrPluginShutdown
}
return err
return err
}
return nil
}
func (c *gRPCClient) RevokeUser(ctx context.Context, statements Statements, username string) error {
switch c.clientConn.GetState() {
case connectivity.Ready, connectivity.Idle:
default:
return ErrPluginShutdown
}
ctx, cancel := context.WithCancel(ctx)
quitCh := pluginutil.CtxCancelIfCanceled(cancel, c.doneCtx)
defer close(quitCh)
defer cancel()
_, err := c.client.RevokeUser(ctx, &RevokeUserRequest{
Statements: &statements,
Username: username,
})
return err
if err != nil {
if c.doneCtx.Err() != nil {
return ErrPluginShutdown
}
return err
}
return nil
}
func (c *gRPCClient) Initialize(ctx context.Context, config map[string]interface{}, verifyConnection bool) error {
@@ -169,30 +178,27 @@ func (c *gRPCClient) Initialize(ctx context.Context, config map[string]interface
return err
}
switch c.clientConn.GetState() {
case connectivity.Ready, connectivity.Idle:
default:
return ErrPluginShutdown
}
ctx, cancel := context.WithCancel(ctx)
quitCh := pluginutil.CtxCancelIfCanceled(cancel, c.doneCtx)
defer close(quitCh)
defer cancel()
_, err = c.client.Initialize(ctx, &InitializeRequest{
Config: configRaw,
VerifyConnection: verifyConnection,
})
if err != nil {
if c.doneCtx.Err() != nil {
return ErrPluginShutdown
}
return err
}
func (c *gRPCClient) Close() error {
// If the plugin has already shutdown, this will hang forever so we give it
// a one second timeout.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
switch c.clientConn.GetState() {
case connectivity.Ready, connectivity.Idle:
_, err := c.client.Close(ctx, &Empty{})
return err
}
return nil
}
func (c *gRPCClient) Close() error {
_, err := c.client.Close(c.doneCtx, &Empty{})
return err
}

View File

@@ -103,6 +103,9 @@ var handshakeConfig = plugin.HandshakeConfig{
MagicCookieValue: "926a0820-aea2-be28-51d6-83cdf00e8edb",
}
var _ plugin.Plugin = &DatabasePlugin{}
var _ plugin.GRPCPlugin = &DatabasePlugin{}
// DatabasePlugin implements go-plugin's Plugin interface. It has methods for
// retrieving a server and a client instance of the plugin.
type DatabasePlugin struct {
@@ -117,14 +120,15 @@ func (DatabasePlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, e
return &databasePluginRPCClient{client: c}, nil
}
func (d DatabasePlugin) GRPCServer(s *grpc.Server) error {
func (d DatabasePlugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error {
RegisterDatabaseServer(s, &gRPCServer{impl: d.impl})
return nil
}
func (DatabasePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) {
func (DatabasePlugin) GRPCClient(doneCtx context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &gRPCClient{
client: NewDatabaseClient(c),
clientConn: c,
doneCtx: doneCtx,
}, nil
}

View File

@@ -157,7 +157,8 @@ func (b *backend) HandleRequest(ctx context.Context, req *logical.Request) (*log
b.RUnlock()
// Need to compare string value for case were err comes from plugin RPC
// and is returned as plugin.BasicError type.
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
if err != nil &&
(err.Error() == rpc.ErrShutdown.Error() || err == bplugin.ErrPluginShutdown) {
// Reload plugin if it's an rpc.ErrShutdown
b.Lock()
if b.canary == canary {
@@ -206,7 +207,8 @@ func (b *backend) HandleExistenceCheck(ctx context.Context, req *logical.Request
checkFound, exists, err := b.Backend.HandleExistenceCheck(ctx, req)
b.RUnlock()
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
if err != nil &&
(err.Error() == rpc.ErrShutdown.Error() || err == bplugin.ErrPluginShutdown) {
// Reload plugin if it's an rpc.ErrShutdown
b.Lock()
if b.canary == canary {

View File

@@ -1,6 +1,7 @@
package pluginutil
import (
"context"
"crypto/sha256"
"crypto/tls"
"flag"
@@ -168,3 +169,18 @@ func (f *APIClientMeta) GetTLSConfig() *api.TLSConfig {
return nil
}
// CancelIfCanceled takes a context cancel func and a context. If the context is
// shutdown the cancelfunc is called. This is useful for merging two cancel
// functions.
func CtxCancelIfCanceled(f context.CancelFunc, ctxCanceler context.Context) chan struct{} {
quitCh := make(chan struct{})
go func() {
select {
case <-quitCh:
case <-ctxCanceler.Done():
f()
}
}()
return quitCh
}

View File

@@ -97,6 +97,8 @@ func createClientTLSConfig(certBytes []byte, key *ecdsa.PrivateKey) (*tls.Config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: clientCertPool,
ClientCAs: clientCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
ServerName: clientCert.Subject.CommonName,
MinVersion: tls.VersionTLS12,
}
@@ -234,6 +236,7 @@ func VaultPluginTLSProvider(apiTLSConfig *api.TLSConfig) func() (*tls.Config, er
// TLS 1.2 minimum
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{cert},
ServerName: serverCert.Subject.CommonName,
}
tlsConfig.BuildNameToCertificate()

View File

@@ -1,16 +1,23 @@
package plugin
import (
"context"
"net/rpc"
"google.golang.org/grpc"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/helper/logbridge"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/plugin/pb"
)
// BackendPlugin is the plugin.Plugin implementation
type BackendPlugin struct {
Factory func(*logical.BackendConfig) (logical.Backend, error)
metadataMode bool
Logger hclog.Logger
}
// Server gets called when on plugin.Serve()
@@ -22,3 +29,23 @@ func (b *BackendPlugin) Server(broker *plugin.MuxBroker) (interface{}, error) {
func (b BackendPlugin) Client(broker *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &backendPluginClient{client: c, broker: broker, metadataMode: b.metadataMode}, nil
}
func (b BackendPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
pb.RegisterBackendServer(s, &backendGRPCPluginServer{
broker: broker,
factory: b.Factory,
// We pass the logger down into the backend so go-plugin will forward
// logs for us.
logger: logbridge.NewLogger(b.Logger).LogxiLogger(),
})
return nil
}
func (p *BackendPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &backendGRPCPluginClient{
client: pb.NewBackendClient(c),
clientConn: c,
broker: broker,
doneCtx: ctx,
}, nil
}

View File

@@ -0,0 +1,212 @@
package plugin
import (
"context"
"errors"
"google.golang.org/grpc"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/helper/pluginutil"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/plugin/pb"
log "github.com/mgutz/logxi/v1"
)
var ErrPluginShutdown = errors.New("plugin is shut down")
// backendPluginClient implements logical.Backend and is the
// go-plugin client.
type backendGRPCPluginClient struct {
broker *plugin.GRPCBroker
client pb.BackendClient
metadataMode bool
system logical.SystemView
logger log.Logger
// server is the grpc server used for serving storage and sysview requests.
server *grpc.Server
// clientConn is the underlying grpc connection to the server, we store it
// so it can be cleaned up.
clientConn *grpc.ClientConn
doneCtx context.Context
}
func (b *backendGRPCPluginClient) HandleRequest(ctx context.Context, req *logical.Request) (*logical.Response, error) {
if b.metadataMode {
return nil, ErrClientInMetadataMode
}
ctx, cancel := context.WithCancel(ctx)
quitCh := pluginutil.CtxCancelIfCanceled(cancel, b.doneCtx)
defer close(quitCh)
defer cancel()
protoReq, err := pb.LogicalRequestToProtoRequest(req)
if err != nil {
return nil, err
}
reply, err := b.client.HandleRequest(ctx, &pb.HandleRequestArgs{
Request: protoReq,
})
if err != nil {
if b.doneCtx.Err() != nil {
return nil, ErrPluginShutdown
}
return nil, err
}
resp, err := pb.ProtoResponseToLogicalResponse(reply.Response)
if err != nil {
return nil, err
}
if reply.Err != nil {
return resp, pb.ProtoErrToErr(reply.Err)
}
return resp, nil
}
func (b *backendGRPCPluginClient) SpecialPaths() *logical.Paths {
// Timeout the connection
reply, err := b.client.SpecialPaths(b.doneCtx, &pb.Empty{})
if err != nil {
return nil
}
return &logical.Paths{
Root: reply.Paths.Root,
Unauthenticated: reply.Paths.Unauthenticated,
LocalStorage: reply.Paths.LocalStorage,
SealWrapStorage: reply.Paths.SealWrapStorage,
}
}
// System returns vault's system view. The backend client stores the view during
// Setup, so there is no need to shim the system just to get it back.
func (b *backendGRPCPluginClient) System() logical.SystemView {
return b.system
}
// Logger returns vault's logger. The backend client stores the logger during
// Setup, so there is no need to shim the logger just to get it back.
func (b *backendGRPCPluginClient) Logger() log.Logger {
return b.logger
}
func (b *backendGRPCPluginClient) HandleExistenceCheck(ctx context.Context, req *logical.Request) (bool, bool, error) {
if b.metadataMode {
return false, false, ErrClientInMetadataMode
}
protoReq, err := pb.LogicalRequestToProtoRequest(req)
if err != nil {
return false, false, err
}
ctx, cancel := context.WithCancel(ctx)
quitCh := pluginutil.CtxCancelIfCanceled(cancel, b.doneCtx)
defer close(quitCh)
defer cancel()
reply, err := b.client.HandleExistenceCheck(ctx, &pb.HandleExistenceCheckArgs{
Request: protoReq,
})
if err != nil {
if b.doneCtx.Err() != nil {
return false, false, ErrPluginShutdown
}
return false, false, err
}
if reply.Err != nil {
return false, false, pb.ProtoErrToErr(reply.Err)
}
return reply.CheckFound, reply.Exists, nil
}
func (b *backendGRPCPluginClient) Cleanup() {
b.client.Cleanup(b.doneCtx, &pb.Empty{})
if b.server != nil {
b.server.GracefulStop()
}
b.clientConn.Close()
}
func (b *backendGRPCPluginClient) Initialize() error {
if b.metadataMode {
return ErrClientInMetadataMode
}
_, err := b.client.Initialize(b.doneCtx, &pb.Empty{})
return err
}
func (b *backendGRPCPluginClient) InvalidateKey(key string) {
if b.metadataMode {
return
}
b.client.InvalidateKey(b.doneCtx, &pb.InvalidateKeyArgs{
Key: key,
})
}
func (b *backendGRPCPluginClient) Setup(config *logical.BackendConfig) error {
// Shim logical.Storage
storageImpl := config.StorageView
if b.metadataMode {
storageImpl = &NOOPStorage{}
}
storage := &GRPCStorageServer{
impl: storageImpl,
}
// Shim logical.SystemView
sysViewImpl := config.System
if b.metadataMode {
sysViewImpl = &logical.StaticSystemView{}
}
sysView := &gRPCSystemViewServer{
impl: sysViewImpl,
}
// Register the server in this closure.
serverFunc := func(opts []grpc.ServerOption) *grpc.Server {
s := grpc.NewServer(opts...)
pb.RegisterSystemViewServer(s, sysView)
pb.RegisterStorageServer(s, storage)
b.server = s
return s
}
brokerID := b.broker.NextId()
go b.broker.AcceptAndServe(brokerID, serverFunc)
args := &pb.SetupArgs{
BrokerID: brokerID,
Config: config.Config,
}
reply, err := b.client.Setup(b.doneCtx, args)
if err != nil {
return err
}
if reply.Err != "" {
return errors.New(reply.Err)
}
// Set system and logger for getter methods
b.system = config.System
b.logger = config.Logger
return nil
}
func (b *backendGRPCPluginClient) Type() logical.BackendType {
reply, err := b.client.Type(b.doneCtx, &pb.Empty{})
if err != nil {
return logical.TypeUnknown
}
return logical.BackendType(reply.Type)
}

View File

@@ -0,0 +1,144 @@
package plugin
import (
"context"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/plugin/pb"
log "github.com/mgutz/logxi/v1"
"google.golang.org/grpc"
)
type backendGRPCPluginServer struct {
broker *plugin.GRPCBroker
backend logical.Backend
factory func(*logical.BackendConfig) (logical.Backend, error)
brokeredClient *grpc.ClientConn
logger log.Logger
}
// Setup dials into the plugin's broker to get a shimmed storage, logger, and
// system view of the backend. This method also instantiates the underlying
// backend through its factory func for the server side of the plugin.
func (b *backendGRPCPluginServer) Setup(ctx context.Context, args *pb.SetupArgs) (*pb.SetupReply, error) {
// Dial for storage
brokeredClient, err := b.broker.Dial(args.BrokerID)
if err != nil {
return &pb.SetupReply{}, err
}
b.brokeredClient = brokeredClient
storage := newGRPCStorageClient(brokeredClient)
sysView := newGRPCSystemView(brokeredClient)
config := &logical.BackendConfig{
StorageView: storage,
Logger: b.logger,
System: sysView,
Config: args.Config,
}
// Call the underlying backend factory after shims have been created
// to set b.backend
backend, err := b.factory(config)
if err != nil {
return &pb.SetupReply{
Err: pb.ErrToString(err),
}, nil
}
b.backend = backend
return &pb.SetupReply{}, nil
}
func (b *backendGRPCPluginServer) HandleRequest(ctx context.Context, args *pb.HandleRequestArgs) (*pb.HandleRequestReply, error) {
if inMetadataMode() {
return &pb.HandleRequestReply{}, ErrServerInMetadataMode
}
logicalReq, err := pb.ProtoRequestToLogicalRequest(args.Request)
if err != nil {
return &pb.HandleRequestReply{}, err
}
logicalReq.Storage = newGRPCStorageClient(b.brokeredClient)
resp, respErr := b.backend.HandleRequest(ctx, logicalReq)
pbResp, err := pb.LogicalResponseToProtoResponse(resp)
if err != nil {
return &pb.HandleRequestReply{}, err
}
return &pb.HandleRequestReply{
Response: pbResp,
Err: pb.ErrToProtoErr(respErr),
}, nil
}
func (b *backendGRPCPluginServer) SpecialPaths(ctx context.Context, args *pb.Empty) (*pb.SpecialPathsReply, error) {
paths := b.backend.SpecialPaths()
return &pb.SpecialPathsReply{
Paths: &pb.Paths{
Root: paths.Root,
Unauthenticated: paths.Unauthenticated,
LocalStorage: paths.LocalStorage,
SealWrapStorage: paths.SealWrapStorage,
},
}, nil
}
func (b *backendGRPCPluginServer) HandleExistenceCheck(ctx context.Context, args *pb.HandleExistenceCheckArgs) (*pb.HandleExistenceCheckReply, error) {
if inMetadataMode() {
return &pb.HandleExistenceCheckReply{}, ErrServerInMetadataMode
}
logicalReq, err := pb.ProtoRequestToLogicalRequest(args.Request)
if err != nil {
return &pb.HandleExistenceCheckReply{}, err
}
logicalReq.Storage = newGRPCStorageClient(b.brokeredClient)
checkFound, exists, err := b.backend.HandleExistenceCheck(ctx, logicalReq)
return &pb.HandleExistenceCheckReply{
CheckFound: checkFound,
Exists: exists,
Err: pb.ErrToProtoErr(err),
}, nil
}
func (b *backendGRPCPluginServer) Cleanup(ctx context.Context, _ *pb.Empty) (*pb.Empty, error) {
b.backend.Cleanup()
// Close rpc clients
b.brokeredClient.Close()
return &pb.Empty{}, nil
}
func (b *backendGRPCPluginServer) Initialize(ctx context.Context, _ *pb.Empty) (*pb.Empty, error) {
if inMetadataMode() {
return &pb.Empty{}, ErrServerInMetadataMode
}
err := b.backend.Initialize()
return &pb.Empty{}, err
}
func (b *backendGRPCPluginServer) InvalidateKey(ctx context.Context, args *pb.InvalidateKeyArgs) (*pb.Empty, error) {
if inMetadataMode() {
return &pb.Empty{}, ErrServerInMetadataMode
}
b.backend.InvalidateKey(args.Key)
return &pb.Empty{}, nil
}
func (b *backendGRPCPluginServer) Type(ctx context.Context, _ *pb.Empty) (*pb.TypeReply, error) {
return &pb.TypeReply{
Type: uint32(b.backend.Type()),
}, nil
}

View File

@@ -0,0 +1,186 @@
package plugin
import (
"context"
"os"
"testing"
"time"
hclog "github.com/hashicorp/go-hclog"
gplugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/helper/logformat"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/plugin/mock"
log "github.com/mgutz/logxi/v1"
)
func TestGRPCBackendPlugin_impl(t *testing.T) {
var _ gplugin.Plugin = new(BackendPlugin)
var _ logical.Backend = new(backendPluginClient)
}
func TestGRPCBackendPlugin_HandleRequest(t *testing.T) {
b, cleanup := testGRPCBackend(t)
defer cleanup()
resp, err := b.HandleRequest(context.Background(), &logical.Request{
Operation: logical.CreateOperation,
Path: "kv/foo",
Data: map[string]interface{}{
"value": "bar",
},
})
if err != nil {
t.Fatal(err)
}
if resp.Data["value"] != "bar" {
t.Fatalf("bad: %#v", resp)
}
}
func TestGRPCBackendPlugin_SpecialPaths(t *testing.T) {
b, cleanup := testGRPCBackend(t)
defer cleanup()
paths := b.SpecialPaths()
if paths == nil {
t.Fatal("SpecialPaths() returned nil")
}
}
func TestGRPCBackendPlugin_System(t *testing.T) {
b, cleanup := testGRPCBackend(t)
defer cleanup()
sys := b.System()
if sys == nil {
t.Fatal("System() returned nil")
}
actual := sys.DefaultLeaseTTL()
expected := 300 * time.Second
if actual != expected {
t.Fatalf("bad: %v, expected %v", actual, expected)
}
}
func TestGRPCBackendPlugin_Logger(t *testing.T) {
b, cleanup := testGRPCBackend(t)
defer cleanup()
logger := b.Logger()
if logger == nil {
t.Fatal("Logger() returned nil")
}
}
func TestGRPCBackendPlugin_HandleExistenceCheck(t *testing.T) {
b, cleanup := testGRPCBackend(t)
defer cleanup()
checkFound, exists, err := b.HandleExistenceCheck(context.Background(), &logical.Request{
Operation: logical.CreateOperation,
Path: "kv/foo",
Data: map[string]interface{}{"value": "bar"},
})
if err != nil {
t.Fatal(err)
}
if !checkFound {
t.Fatal("existence check not found for path 'kv/foo")
}
if exists {
t.Fatal("existence check should have returned 'false' for 'kv/foo'")
}
}
func TestGRPCBackendPlugin_Cleanup(t *testing.T) {
b, cleanup := testGRPCBackend(t)
defer cleanup()
b.Cleanup()
}
func TestGRPCBackendPlugin_Initialize(t *testing.T) {
b, cleanup := testGRPCBackend(t)
defer cleanup()
err := b.Initialize()
if err != nil {
t.Fatal(err)
}
}
func TestGRPCBackendPlugin_InvalidateKey(t *testing.T) {
b, cleanup := testGRPCBackend(t)
defer cleanup()
resp, err := b.HandleRequest(context.Background(), &logical.Request{
Operation: logical.ReadOperation,
Path: "internal",
})
if err != nil {
t.Fatal(err)
}
if resp.Data["value"] == "" {
t.Fatalf("bad: %#v, expected non-empty value", resp)
}
b.InvalidateKey("internal")
resp, err = b.HandleRequest(context.Background(), &logical.Request{
Operation: logical.ReadOperation,
Path: "internal",
})
if err != nil {
t.Fatal(err)
}
if resp.Data["value"] != "" {
t.Fatalf("bad: expected empty response data, got %#v", resp)
}
}
func TestGRPCBackendPlugin_Setup(t *testing.T) {
_, cleanup := testGRPCBackend(t)
defer cleanup()
}
func testGRPCBackend(t *testing.T) (logical.Backend, func()) {
// Create a mock provider
pluginMap := map[string]gplugin.Plugin{
"backend": &BackendPlugin{
Factory: mock.Factory,
Logger: hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: os.Stderr,
JSONFormat: true,
}),
},
}
client, _ := gplugin.TestPluginGRPCConn(t, pluginMap)
cleanup := func() {
client.Close()
}
// Request the backend
raw, err := client.Dispense(BackendPluginName)
if err != nil {
t.Fatal(err)
}
b := raw.(logical.Backend)
err = b.Setup(&logical.BackendConfig{
Logger: logformat.NewVaultLogger(log.LevelTrace),
System: &logical.StaticSystemView{
DefaultLeaseTTLVal: 300 * time.Second,
MaxLeaseTTLVal: 1800 * time.Second,
},
StorageView: &logical.InmemStorage{},
})
if err != nil {
t.Fatal(err)
}
return b, cleanup
}

View File

@@ -0,0 +1,110 @@
package plugin
import (
"context"
"errors"
"google.golang.org/grpc"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/plugin/pb"
)
func newGRPCStorageClient(conn *grpc.ClientConn) *GRPCStorageClient {
return &GRPCStorageClient{
client: pb.NewStorageClient(conn),
}
}
// GRPCStorageClient is an implementation of logical.Storage that communicates
// over RPC.
type GRPCStorageClient struct {
client pb.StorageClient
}
func (s *GRPCStorageClient) List(prefix string) ([]string, error) {
reply, err := s.client.List(context.Background(), &pb.StorageListArgs{
Prefix: prefix,
})
if err != nil {
return reply.Keys, err
}
if reply.Err != "" {
return reply.Keys, errors.New(reply.Err)
}
return reply.Keys, nil
}
func (s *GRPCStorageClient) Get(key string) (*logical.StorageEntry, error) {
reply, err := s.client.Get(context.Background(), &pb.StorageGetArgs{
Key: key,
})
if err != nil {
return nil, err
}
if reply.Err != "" {
return nil, errors.New(reply.Err)
}
return pb.ProtoStorageEntryToLogicalStorageEntry(reply.Entry), nil
}
func (s *GRPCStorageClient) Put(entry *logical.StorageEntry) error {
reply, err := s.client.Put(context.Background(), &pb.StoragePutArgs{
Entry: pb.LogicalStorageEntryToProtoStorageEntry(entry),
})
if err != nil {
return err
}
if reply.Err != "" {
return errors.New(reply.Err)
}
return nil
}
func (s *GRPCStorageClient) Delete(key string) error {
reply, err := s.client.Delete(context.Background(), &pb.StorageDeleteArgs{
Key: key,
})
if err != nil {
return err
}
if reply.Err != "" {
return errors.New(reply.Err)
}
return nil
}
// StorageServer is a net/rpc compatible structure for serving
type GRPCStorageServer struct {
impl logical.Storage
}
func (s *GRPCStorageServer) List(ctx context.Context, args *pb.StorageListArgs) (*pb.StorageListReply, error) {
keys, err := s.impl.List(args.Prefix)
return &pb.StorageListReply{
Keys: keys,
Err: pb.ErrToString(err),
}, nil
}
func (s *GRPCStorageServer) Get(ctx context.Context, args *pb.StorageGetArgs) (*pb.StorageGetReply, error) {
storageEntry, err := s.impl.Get(args.Key)
return &pb.StorageGetReply{
Entry: pb.LogicalStorageEntryToProtoStorageEntry(storageEntry),
Err: pb.ErrToString(err),
}, nil
}
func (s *GRPCStorageServer) Put(ctx context.Context, args *pb.StoragePutArgs) (*pb.StoragePutReply, error) {
err := s.impl.Put(pb.ProtoStorageEntryToLogicalStorageEntry(args.Entry))
return &pb.StoragePutReply{
Err: pb.ErrToString(err),
}, nil
}
func (s *GRPCStorageServer) Delete(ctx context.Context, args *pb.StorageDeleteArgs) (*pb.StorageDeleteReply, error) {
err := s.impl.Delete(args.Key)
return &pb.StorageDeleteReply{
Err: pb.ErrToString(err),
}, nil
}

View File

@@ -0,0 +1,202 @@
package plugin
import (
"context"
"encoding/json"
"errors"
"time"
"google.golang.org/grpc"
"fmt"
"github.com/hashicorp/vault/helper/consts"
"github.com/hashicorp/vault/helper/pluginutil"
"github.com/hashicorp/vault/helper/wrapping"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/plugin/pb"
)
func newGRPCSystemView(conn *grpc.ClientConn) *gRPCSystemViewClient {
return &gRPCSystemViewClient{
client: pb.NewSystemViewClient(conn),
}
}
type gRPCSystemViewClient struct {
client pb.SystemViewClient
}
func (s *gRPCSystemViewClient) DefaultLeaseTTL() time.Duration {
reply, err := s.client.DefaultLeaseTTL(context.Background(), &pb.Empty{})
if err != nil {
return 0
}
return time.Duration(reply.TTL)
}
func (s *gRPCSystemViewClient) MaxLeaseTTL() time.Duration {
reply, err := s.client.MaxLeaseTTL(context.Background(), &pb.Empty{})
if err != nil {
return 0
}
return time.Duration(reply.TTL)
}
func (s *gRPCSystemViewClient) SudoPrivilege(path string, token string) bool {
reply, err := s.client.SudoPrivilege(context.Background(), &pb.SudoPrivilegeArgs{
Path: path,
Token: token,
})
if err != nil {
return false
}
return reply.Sudo
}
func (s *gRPCSystemViewClient) Tainted() bool {
reply, err := s.client.Tainted(context.Background(), &pb.Empty{})
if err != nil {
return false
}
return reply.Tainted
}
func (s *gRPCSystemViewClient) CachingDisabled() bool {
reply, err := s.client.CachingDisabled(context.Background(), &pb.Empty{})
if err != nil {
return false
}
return reply.Disabled
}
func (s *gRPCSystemViewClient) ReplicationState() consts.ReplicationState {
reply, err := s.client.ReplicationState(context.Background(), &pb.Empty{})
if err != nil {
return consts.ReplicationDisabled
}
return consts.ReplicationState(reply.State)
}
func (s *gRPCSystemViewClient) ResponseWrapData(data map[string]interface{}, ttl time.Duration, jwt bool) (*wrapping.ResponseWrapInfo, error) {
buf, err := json.Marshal(data)
if err != nil {
return nil, err
}
reply, err := s.client.ResponseWrapData(context.Background(), &pb.ResponseWrapDataArgs{
Data: buf,
TTL: int64(ttl),
JWT: false,
})
if err != nil {
return nil, err
}
if reply.Err != "" {
return nil, errors.New(reply.Err)
}
info, err := pb.ProtoResponseWrapInfoToLogicalResponseWrapInfo(reply.WrapInfo)
if err != nil {
return nil, err
}
return info, nil
}
func (s *gRPCSystemViewClient) LookupPlugin(name string) (*pluginutil.PluginRunner, error) {
return nil, fmt.Errorf("cannot call LookupPlugin from a plugin backend")
}
func (s *gRPCSystemViewClient) MlockEnabled() bool {
reply, err := s.client.MlockEnabled(context.Background(), &pb.Empty{})
if err != nil {
return false
}
return reply.Enabled
}
type gRPCSystemViewServer struct {
impl logical.SystemView
}
func (s *gRPCSystemViewServer) DefaultLeaseTTL(ctx context.Context, _ *pb.Empty) (*pb.TTLReply, error) {
ttl := s.impl.DefaultLeaseTTL()
return &pb.TTLReply{
TTL: int64(ttl),
}, nil
}
func (s *gRPCSystemViewServer) MaxLeaseTTL(ctx context.Context, _ *pb.Empty) (*pb.TTLReply, error) {
ttl := s.impl.MaxLeaseTTL()
return &pb.TTLReply{
TTL: int64(ttl),
}, nil
}
func (s *gRPCSystemViewServer) SudoPrivilege(ctx context.Context, args *pb.SudoPrivilegeArgs) (*pb.SudoPrivilegeReply, error) {
sudo := s.impl.SudoPrivilege(args.Path, args.Token)
return &pb.SudoPrivilegeReply{
Sudo: sudo,
}, nil
}
func (s *gRPCSystemViewServer) Tainted(ctx context.Context, _ *pb.Empty) (*pb.TaintedReply, error) {
tainted := s.impl.Tainted()
return &pb.TaintedReply{
Tainted: tainted,
}, nil
}
func (s *gRPCSystemViewServer) CachingDisabled(ctx context.Context, _ *pb.Empty) (*pb.CachingDisabledReply, error) {
cachingDisabled := s.impl.CachingDisabled()
return &pb.CachingDisabledReply{
Disabled: cachingDisabled,
}, nil
}
func (s *gRPCSystemViewServer) ReplicationState(ctx context.Context, _ *pb.Empty) (*pb.ReplicationStateReply, error) {
replicationState := s.impl.ReplicationState()
return &pb.ReplicationStateReply{
State: int32(replicationState),
}, nil
}
func (s *gRPCSystemViewServer) ResponseWrapData(ctx context.Context, args *pb.ResponseWrapDataArgs) (*pb.ResponseWrapDataReply, error) {
data := map[string]interface{}{}
err := json.Unmarshal(args.Data, &data)
if err != nil {
return &pb.ResponseWrapDataReply{}, err
}
// Do not allow JWTs to be returned
info, err := s.impl.ResponseWrapData(data, time.Duration(args.TTL), false)
if err != nil {
return &pb.ResponseWrapDataReply{
Err: pb.ErrToString(err),
}, nil
}
pbInfo, err := pb.LogicalResponseWrapInfoToProtoResponseWrapInfo(info)
if err != nil {
return &pb.ResponseWrapDataReply{}, err
}
return &pb.ResponseWrapDataReply{
WrapInfo: pbInfo,
}, nil
}
func (s *gRPCSystemViewServer) MlockEnabled(ctx context.Context, _ *pb.Empty) (*pb.MlockEnabledReply, error) {
enabled := s.impl.MlockEnabled()
return &pb.MlockEnabledReply{
Enabled: enabled,
}, nil
}

View File

@@ -0,0 +1,163 @@
package plugin
import (
"testing"
"google.golang.org/grpc"
"reflect"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/helper/consts"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/plugin/pb"
)
func TestSystem_GRPC_GRPC_impl(t *testing.T) {
var _ logical.SystemView = new(gRPCSystemViewClient)
}
func TestSystem_GRPC_defaultLeaseTTL(t *testing.T) {
sys := logical.TestSystemView()
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterSystemViewServer(s, &gRPCSystemViewServer{
impl: sys,
})
})
defer client.Close()
testSystemView := newGRPCSystemView(client)
expected := sys.DefaultLeaseTTL()
actual := testSystemView.DefaultLeaseTTL()
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("expected: %v, got: %v", expected, actual)
}
}
func TestSystem_GRPC_maxLeaseTTL(t *testing.T) {
sys := logical.TestSystemView()
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterSystemViewServer(s, &gRPCSystemViewServer{
impl: sys,
})
})
defer client.Close()
testSystemView := newGRPCSystemView(client)
expected := sys.MaxLeaseTTL()
actual := testSystemView.MaxLeaseTTL()
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("expected: %v, got: %v", expected, actual)
}
}
func TestSystem_GRPC_sudoPrivilege(t *testing.T) {
sys := logical.TestSystemView()
sys.SudoPrivilegeVal = true
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterSystemViewServer(s, &gRPCSystemViewServer{
impl: sys,
})
})
defer client.Close()
testSystemView := newGRPCSystemView(client)
expected := sys.SudoPrivilege("foo", "bar")
actual := testSystemView.SudoPrivilege("foo", "bar")
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("expected: %v, got: %v", expected, actual)
}
}
func TestSystem_GRPC_tainted(t *testing.T) {
sys := logical.TestSystemView()
sys.TaintedVal = true
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterSystemViewServer(s, &gRPCSystemViewServer{
impl: sys,
})
})
defer client.Close()
testSystemView := newGRPCSystemView(client)
expected := sys.Tainted()
actual := testSystemView.Tainted()
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("expected: %v, got: %v", expected, actual)
}
}
func TestSystem_GRPC_cachingDisabled(t *testing.T) {
sys := logical.TestSystemView()
sys.CachingDisabledVal = true
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterSystemViewServer(s, &gRPCSystemViewServer{
impl: sys,
})
})
defer client.Close()
testSystemView := newGRPCSystemView(client)
expected := sys.CachingDisabled()
actual := testSystemView.CachingDisabled()
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("expected: %v, got: %v", expected, actual)
}
}
func TestSystem_GRPC_replicationState(t *testing.T) {
sys := logical.TestSystemView()
sys.ReplicationStateVal = consts.ReplicationPerformancePrimary
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterSystemViewServer(s, &gRPCSystemViewServer{
impl: sys,
})
})
defer client.Close()
testSystemView := newGRPCSystemView(client)
expected := sys.ReplicationState()
actual := testSystemView.ReplicationState()
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("expected: %v, got: %v", expected, actual)
}
}
func TestSystem_GRPC_responseWrapData(t *testing.T) {
t.SkipNow()
}
func TestSystem_GRPC_lookupPlugin(t *testing.T) {
sys := logical.TestSystemView()
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterSystemViewServer(s, &gRPCSystemViewServer{
impl: sys,
})
})
defer client.Close()
testSystemView := newGRPCSystemView(client)
if _, err := testSystemView.LookupPlugin("foo"); err == nil {
t.Fatal("LookPlugin(): expected error on due to unsupported call from plugin")
}
}
func TestSystem_GRPC_mlockEnabled(t *testing.T) {
sys := logical.TestSystemView()
sys.EnableMlock = true
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterSystemViewServer(s, &gRPCSystemViewServer{
impl: sys,
})
})
defer client.Close()
testSystemView := newGRPCSystemView(client)
expected := sys.MlockEnabled()
actual := testSystemView.MlockEnabled()
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("expected: %v, got: %v", expected, actual)
}
}

View File

@@ -0,0 +1,99 @@
package plugin
import (
"context"
"time"
"github.com/hashicorp/vault/logical"
log "github.com/mgutz/logxi/v1"
)
// backendPluginClient implements logical.Backend and is the
// go-plugin client.
type backendTracingMiddleware struct {
logger log.Logger
transport string
typeStr string
next logical.Backend
}
func (b *backendTracingMiddleware) HandleRequest(ctx context.Context, req *logical.Request) (resp *logical.Response, err error) {
defer func(then time.Time) {
b.logger.Trace("plugin.HandleRequest", "path", req.Path, "status", "finished", "type", b.typeStr, "transport", b.transport, "err", err, "took", time.Since(then))
}(time.Now())
b.logger.Trace("plugin.HandleRequest", "path", req.Path, "status", "started", "type", b.typeStr, "transport", b.transport)
return b.next.HandleRequest(ctx, req)
}
func (b *backendTracingMiddleware) SpecialPaths() *logical.Paths {
defer func(then time.Time) {
b.logger.Trace("plugin.SpecialPaths", "status", "finished", "type", b.typeStr, "transport", b.transport, "took", time.Since(then))
}(time.Now())
b.logger.Trace("plugin.SpecialPaths", "status", "started", "type", b.typeStr, "transport", b.transport)
return b.next.SpecialPaths()
}
func (b *backendTracingMiddleware) System() logical.SystemView {
return b.next.System()
}
func (b *backendTracingMiddleware) Logger() log.Logger {
return b.next.Logger()
}
func (b *backendTracingMiddleware) HandleExistenceCheck(ctx context.Context, req *logical.Request) (found bool, exists bool, err error) {
defer func(then time.Time) {
b.logger.Trace("plugin.HandleExistenceCheck", "path", req.Path, "status", "finished", "type", b.typeStr, "transport", b.transport, "err", err, "took", time.Since(then))
}(time.Now())
b.logger.Trace("plugin.HandleExistenceCheck", "path", req.Path, "status", "started", "type", b.typeStr, "transport", b.transport)
return b.next.HandleExistenceCheck(ctx, req)
}
func (b *backendTracingMiddleware) Cleanup() {
defer func(then time.Time) {
b.logger.Trace("plugin.Cleanup", "status", "finished", "type", b.typeStr, "transport", b.transport, "took", time.Since(then))
}(time.Now())
b.logger.Trace("plugin.Cleanup", "status", "started", "type", b.typeStr, "transport", b.transport)
b.next.Cleanup()
}
func (b *backendTracingMiddleware) Initialize() (err error) {
defer func(then time.Time) {
b.logger.Trace("plugin.Initialize", "status", "finished", "type", b.typeStr, "transport", b.transport, "err", err, "took", time.Since(then))
}(time.Now())
b.logger.Trace("plugin.Initialize", "status", "started", "type", b.typeStr, "transport", b.transport)
return b.next.Initialize()
}
func (b *backendTracingMiddleware) InvalidateKey(key string) {
defer func(then time.Time) {
b.logger.Trace("plugin.InvalidateKey", "key", key, "status", "finished", "type", b.typeStr, "transport", b.transport, "took", time.Since(then))
}(time.Now())
b.logger.Trace("plugin.InvalidateKey", "key", key, "status", "started", "type", b.typeStr, "transport", b.transport)
b.next.InvalidateKey(key)
}
func (b *backendTracingMiddleware) Setup(config *logical.BackendConfig) (err error) {
defer func(then time.Time) {
b.logger.Trace("plugin.Setup", "status", "finished", "type", b.typeStr, "transport", b.transport, "err", err, "took", time.Since(then))
}(time.Now())
b.logger.Trace("plugin.Setup", "status", "started", "type", b.typeStr, "transport", b.transport)
return b.next.Setup(config)
}
func (b *backendTracingMiddleware) Type() logical.BackendType {
defer func(then time.Time) {
b.logger.Trace("plugin.Type", "status", "finished", "type", b.typeStr, "transport", b.transport, "took", time.Since(then))
}(time.Now())
b.logger.Trace("plugin.Type", "status", "started", "type", b.typeStr, "transport", b.transport)
return b.next.Type()
}

View File

@@ -2,10 +2,13 @@ package mock
import (
"context"
"errors"
"net/rpc"
"github.com/hashicorp/vault/helper/errutil"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/framework"
"github.com/hashicorp/vault/logical/plugin/pb"
)
// pathInternal is used to test viewing internal backend values. In this case,
@@ -24,9 +27,49 @@ func errorPaths(b *backend) []*framework.Path {
logical.ReadOperation: b.pathErrorRPCRead,
},
},
&framework.Path{
Pattern: "errors/type",
Fields: map[string]*framework.FieldSchema{
"err_type": &framework.FieldSchema{Type: framework.TypeInt},
},
Callbacks: map[logical.Operation]framework.OperationFunc{
logical.CreateOperation: b.pathErrorRPCRead,
logical.UpdateOperation: b.pathErrorRPCRead,
},
},
}
}
func (b *backend) pathErrorRPCRead(ctx context.Context, req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
return nil, rpc.ErrShutdown
errTypeRaw, ok := data.GetOk("err_type")
if !ok {
return nil, rpc.ErrShutdown
}
var err error
switch uint32(errTypeRaw.(int)) {
case pb.ErrTypeUnknown:
err = errors.New("test")
case pb.ErrTypeUserError:
err = errutil.UserError{Err: "test"}
case pb.ErrTypeInternalError:
err = errutil.InternalError{Err: "test"}
case pb.ErrTypeCodedError:
err = logical.CodedError(403, "test")
case pb.ErrTypeStatusBadRequest:
err = &logical.StatusBadRequest{Err: "test"}
case pb.ErrTypeUnsupportedOperation:
err = logical.ErrUnsupportedOperation
case pb.ErrTypeUnsupportedPath:
err = logical.ErrUnsupportedPath
case pb.ErrTypeInvalidRequest:
err = logical.ErrInvalidRequest
case pb.ErrTypePermissionDenied:
err = logical.ErrPermissionDenied
case pb.ErrTypeMultiAuthzPending:
err = logical.ErrMultiAuthzPending
}
return nil, err
}

View File

@@ -56,6 +56,7 @@ func (b *backend) pathKVRead(ctx context.Context, req *logical.Request, data *fr
value := string(entry.Value)
b.Logger().Info("reading value", "key", req.Path, "value", value)
// Return the secret
return &logical.Response{
Data: map[string]interface{}{
@@ -67,6 +68,7 @@ func (b *backend) pathKVRead(ctx context.Context, req *logical.Request, data *fr
func (b *backend) pathKVCreateUpdate(ctx context.Context, req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
value := data.Get("value").(string)
b.Logger().Info("storing value", "key", req.Path, "value", value)
entry := &logical.StorageEntry{
Key: req.Path,
Value: []byte(value),

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,450 @@
syntax = "proto3";
package pb;
import "google/protobuf/timestamp.proto";
message Empty {}
message Header {
repeated string header = 1;
}
message ProtoError {
uint32 err_type = 1;
string err_msg = 2;
int64 err_code = 3;
}
// Paths is the structure of special paths that is used for SpecialPaths.
message Paths {
// Root are the paths that require a root token to access
repeated string root = 1;
// Unauthenticated are the paths that can be accessed without any auth.
repeated string unauthenticated = 2;
// LocalStorage are paths (prefixes) that are local to this instance; this
// indicates that these paths should not be replicated
repeated string local_storage = 3;
// SealWrapStorage are storage paths that, when using a capable seal,
// should be seal wrapped with extra encryption. It is exact matching
// unless it ends with '/' in which case it will be treated as a prefix.
repeated string seal_wrap_storage = 4;
}
message Request {
// Id is the uuid associated with each request
string id = 1;
// If set, the name given to the replication secondary where this request
// originated
string ReplicationCluster = 2;
// Operation is the requested operation type
string operation = 3;
// Path is the part of the request path not consumed by the
// routing. As an example, if the original request path is "prod/aws/foo"
// and the AWS logical backend is mounted at "prod/aws/", then the
// final path is "foo" since the mount prefix is trimmed.
string path = 4;
// Request data is an opaque map that must have string keys.
bytes data = 5;
// Secret will be non-nil only for Revoke and Renew operations
// to represent the secret that was returned prior.
Secret secret = 6;
// Auth will be non-nil only for Renew operations
// to represent the auth that was returned prior.
Auth auth = 7;
// Headers will contain the http headers from the request. This value will
// be used in the audit broker to ensure we are auditing only the allowed
// headers.
map<string, Header> headers = 8;
// ClientToken is provided to the core so that the identity
// can be verified and ACLs applied. This value is passed
// through to the logical backends but after being salted and
// hashed.
string client_token = 9;
// ClientTokenAccessor is provided to the core so that the it can get
// logged as part of request audit logging.
string client_token_accessor = 10;
// DisplayName is provided to the logical backend to help associate
// dynamic secrets with the source entity. This is not a sensitive
// name, but is useful for operators.
string display_name = 11;
// MountPoint is provided so that a logical backend can generate
// paths relative to itself. The `Path` is effectively the client
// request path with the MountPoint trimmed off.
string mount_point = 12;
// MountType is provided so that a logical backend can make decisions
// based on the specific mount type (e.g., if a mount type has different
// aliases, generating different defaults depending on the alias)
string mount_type = 13;
// MountAccessor is provided so that identities returned by the authentication
// backends can be tied to the mount it belongs to.
string mount_accessor = 14;
// WrapInfo contains requested response wrapping parameters
RequestWrapInfo wrap_info = 15;
// ClientTokenRemainingUses represents the allowed number of uses left on the
// token supplied
int64 client_token_remaining_uses = 16;
// EntityID is the identity of the caller extracted out of the token used
// to make this request
string entity_id = 17;
// PolicyOverride indicates that the requestor wishes to override
// soft-mandatory Sentinel policies
bool policy_override = 18;
// Whether the request is unauthenticated, as in, had no client token
// attached. Useful in some situations where the client token is not made
// accessible.
bool unauthenticated = 19;
}
message Alias {
// MountType is the backend mount's type to which this identity belongs
string mount_type = 1;
// MountAccessor is the identifier of the mount entry to which this
// identity belongs
string mount_accessor = 2;
// Name is the identifier of this identity in its authentication source
string name = 3;
}
message Auth {
LeaseOptions lease_options = 1;
// InternalData is JSON-encodable data that is stored with the auth struct.
// This will be sent back during a Renew/Revoke for storing internal data
// used for those operations.
bytes internal_data = 2;
// DisplayName is a non-security sensitive identifier that is
// applicable to this Auth. It is used for logging and prefixing
// of dynamic secrets. For example, DisplayName may be "armon" for
// the github credential backend. If the client token is used to
// generate a SQL credential, the user may be "github-armon-uuid".
// This is to help identify the source without using audit tables.
string display_name = 3;
// Policies is the list of policies that the authenticated user
// is associated with.
repeated string policies = 4;
// Metadata is used to attach arbitrary string-type metadata to
// an authenticated user. This metadata will be outputted into the
// audit log.
map<string, string> metadata = 5;
// ClientToken is the token that is generated for the authentication.
// This will be filled in by Vault core when an auth structure is
// returned. Setting this manually will have no effect.
string client_token = 6;
// Accessor is the identifier for the ClientToken. This can be used
// to perform management functionalities (especially revocation) when
// ClientToken in the audit logs are obfuscated. Accessor can be used
// to revoke a ClientToken and to lookup the capabilities of the ClientToken,
// both without actually knowing the ClientToken.
string accessor = 7;
// Period indicates that the token generated using this Auth object
// should never expire. The token should be renewed within the duration
// specified by this period.
int64 period = 8;
// Number of allowed uses of the issued token
int64 num_uses = 9;
// EntityID is the identifier of the entity in identity store to which the
// identity of the authenticating client belongs to.
string entity_id = 10;
// Alias is the information about the authenticated client returned by
// the auth backend
Alias alias = 11;
// GroupAliases are the informational mappings of external groups which an
// authenticated user belongs to. This is used to check if there are
// mappings groups for the group aliases in identity store. For all the
// matching groups, the entity ID of the user will be added.
repeated Alias group_aliases = 12;
}
message LeaseOptions {
int64 TTL = 1;
bool renewable = 2;
int64 increment = 3;
google.protobuf.Timestamp issue_time = 4;
}
message Secret {
LeaseOptions lease_options = 1;
// InternalData is JSON-encodable data that is stored with the secret.
// This will be sent back during a Renew/Revoke for storing internal data
// used for those operations.
bytes internal_data = 2;
// LeaseID is the ID returned to the user to manage this secret.
// This is generated by Vault core. Any set value will be ignored.
// For requests, this will always be blank.
string lease_id = 3;
}
message Response {
// Secret, if not nil, denotes that this response represents a secret.
Secret secret = 1;
// Auth, if not nil, contains the authentication information for
// this response. This is only checked and means something for
// credential backends.
Auth auth = 2;
// Response data is an opaque map that must have string keys. For
// secrets, this data is sent down to the user as-is. To store internal
// data that you don't want the user to see, store it in
// Secret.InternalData.
bytes data = 3;
// Redirect is an HTTP URL to redirect to for further authentication.
// This is only valid for credential backends. This will be blanked
// for any logical backend and ignored.
string redirect = 4;
// Warnings allow operations or backends to return warnings in response
// to user actions without failing the action outright.
repeated string warnings = 5;
// Information for wrapping the response in a cubbyhole
ResponseWrapInfo wrap_info = 6;
}
message ResponseWrapInfo {
// Setting to non-zero specifies that the response should be wrapped.
// Specifies the desired TTL of the wrapping token.
int64 TTL = 1;
// The token containing the wrapped response
string token = 2;
// The token accessor for the wrapped response token
string accessor = 3;
// The creation time. This can be used with the TTL to figure out an
// expected expiration.
google.protobuf.Timestamp creation_time = 4;
// If the contained response is the output of a token creation call, the
// created token's accessor will be accessible here
string wrapped_accessor = 5;
// WrappedEntityID is the entity identifier of the caller who initiated the
// wrapping request
string wrapped_entity_id = 6;
// The format to use. This doesn't get returned, it's only internal.
string format = 7;
// CreationPath is the original request path that was used to create
// the wrapped response.
string creation_path = 8;
// Controls seal wrapping behavior downstream for specific use cases
bool seal_wrap = 9;
}
message RequestWrapInfo {
// Setting to non-zero specifies that the response should be wrapped.
// Specifies the desired TTL of the wrapping token.
int64 TTL = 1;
// The format to use for the wrapped response; if not specified it's a bare
// token
string format = 2;
// A flag to conforming backends that data for a given request should be
// seal wrapped
bool seal_wrap = 3;
}
// HandleRequestArgs is the args for HandleRequest method.
message HandleRequestArgs {
uint32 storage_id = 1;
Request request = 2;
}
// HandleRequestReply is the reply for HandleRequest method.
message HandleRequestReply {
Response response = 1;
ProtoError err = 2;
}
// SpecialPathsReply is the reply for SpecialPaths method.
message SpecialPathsReply {
Paths paths = 1;
}
// HandleExistenceCheckArgs is the args for HandleExistenceCheck method.
message HandleExistenceCheckArgs {
uint32 storage_id = 1;
Request request = 2;
}
// HandleExistenceCheckReply is the reply for HandleExistenceCheck method.
message HandleExistenceCheckReply {
bool check_found = 1;
bool exists = 2;
ProtoError err = 3;
}
// SetupArgs is the args for Setup method.
message SetupArgs {
uint32 broker_id = 1;
map<string, string> Config = 2;
}
// SetupReply is the reply for Setup method.
message SetupReply {
string err = 1;
}
// TypeReply is the reply for the Type method.
message TypeReply {
uint32 type = 1;
}
message InvalidateKeyArgs {
string key = 1;
}
service Backend {
rpc HandleRequest(HandleRequestArgs) returns (HandleRequestReply);
rpc SpecialPaths(Empty) returns (SpecialPathsReply);
rpc HandleExistenceCheck(HandleExistenceCheckArgs) returns (HandleExistenceCheckReply);
rpc Initialize(Empty) returns (Empty);
rpc Cleanup(Empty) returns (Empty);
rpc InvalidateKey(InvalidateKeyArgs) returns (Empty);
rpc Setup(SetupArgs) returns (SetupReply);
rpc Type(Empty) returns (TypeReply);
}
message StorageEntry {
string key = 1;
bytes value = 2;
bool seal_wrap = 3;
}
message StorageListArgs {
string prefix = 1;
}
message StorageListReply {
repeated string keys = 1;
string err = 2;
}
message StorageGetArgs {
string key = 1;
}
message StorageGetReply {
StorageEntry entry = 1;
string err = 2;
}
message StoragePutArgs {
StorageEntry entry = 1;
}
message StoragePutReply {
string err = 1;
}
message StorageDeleteArgs {
string key = 1;
}
message StorageDeleteReply {
string err = 1;
}
service Storage {
rpc List(StorageListArgs) returns (StorageListReply);
rpc Get(StorageGetArgs) returns (StorageGetReply);
rpc Put(StoragePutArgs) returns (StoragePutReply);
rpc Delete(StorageDeleteArgs) returns (StorageDeleteReply);
}
message TTLReply {
int64 TTL = 1;
}
message SudoPrivilegeArgs {
string path = 1;
string token = 2;
}
message SudoPrivilegeReply {
bool sudo = 1;
}
message TaintedReply {
bool tainted = 1;
}
message CachingDisabledReply {
bool disabled = 1;
}
message ReplicationStateReply {
int32 state = 1;
}
message ResponseWrapDataArgs {
bytes data = 1;
int64 TTL = 2;
bool JWT = 3;
}
message ResponseWrapDataReply {
ResponseWrapInfo wrap_info = 1;
string err = 2;
}
message MlockEnabledReply {
bool enabled = 1;
}
service SystemView {
rpc DefaultLeaseTTL(Empty) returns (TTLReply);
rpc MaxLeaseTTL(Empty) returns (TTLReply);
rpc SudoPrivilege(SudoPrivilegeArgs) returns (SudoPrivilegeReply);
rpc Tainted(Empty) returns (TaintedReply);
rpc CachingDisabled(Empty) returns (CachingDisabledReply);
rpc ReplicationState(Empty) returns (ReplicationStateReply);
rpc ResponseWrapData(ResponseWrapDataArgs) returns (ResponseWrapDataReply);
rpc MlockEnabled(Empty) returns (MlockEnabledReply);
}

View File

@@ -0,0 +1,539 @@
package pb
import (
"encoding/json"
"errors"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/vault/helper/errutil"
"github.com/hashicorp/vault/helper/wrapping"
"github.com/hashicorp/vault/logical"
)
const (
ErrTypeUnknown uint32 = iota
ErrTypeUserError
ErrTypeInternalError
ErrTypeCodedError
ErrTypeStatusBadRequest
ErrTypeUnsupportedOperation
ErrTypeUnsupportedPath
ErrTypeInvalidRequest
ErrTypePermissionDenied
ErrTypeMultiAuthzPending
)
func ProtoErrToErr(e *ProtoError) error {
if e == nil {
return nil
}
var err error
switch e.ErrType {
case ErrTypeUnknown:
err = errors.New(e.ErrMsg)
case ErrTypeUserError:
err = errutil.UserError{Err: e.ErrMsg}
case ErrTypeInternalError:
err = errutil.InternalError{Err: e.ErrMsg}
case ErrTypeCodedError:
err = logical.CodedError(int(e.ErrCode), e.ErrMsg)
case ErrTypeStatusBadRequest:
err = &logical.StatusBadRequest{Err: e.ErrMsg}
case ErrTypeUnsupportedOperation:
err = logical.ErrUnsupportedOperation
case ErrTypeUnsupportedPath:
err = logical.ErrUnsupportedPath
case ErrTypeInvalidRequest:
err = logical.ErrInvalidRequest
case ErrTypePermissionDenied:
err = logical.ErrPermissionDenied
case ErrTypeMultiAuthzPending:
err = logical.ErrMultiAuthzPending
}
return err
}
func ErrToProtoErr(e error) *ProtoError {
if e == nil {
return nil
}
pbErr := &ProtoError{
ErrMsg: e.Error(),
ErrType: ErrTypeUnknown,
}
switch e.(type) {
case errutil.UserError:
pbErr.ErrType = ErrTypeUserError
case errutil.InternalError:
pbErr.ErrType = ErrTypeInternalError
case logical.HTTPCodedError:
pbErr.ErrType = ErrTypeCodedError
pbErr.ErrCode = int64(e.(logical.HTTPCodedError).Code())
case *logical.StatusBadRequest:
pbErr.ErrType = ErrTypeStatusBadRequest
}
switch {
case e == logical.ErrUnsupportedOperation:
pbErr.ErrType = ErrTypeUnsupportedOperation
case e == logical.ErrUnsupportedPath:
pbErr.ErrType = ErrTypeUnsupportedPath
case e == logical.ErrInvalidRequest:
pbErr.ErrType = ErrTypeInvalidRequest
case e == logical.ErrPermissionDenied:
pbErr.ErrType = ErrTypePermissionDenied
case e == logical.ErrMultiAuthzPending:
pbErr.ErrType = ErrTypeMultiAuthzPending
}
return pbErr
}
func ErrToString(e error) string {
if e == nil {
return ""
}
return e.Error()
}
func LogicalStorageEntryToProtoStorageEntry(e *logical.StorageEntry) *StorageEntry {
if e == nil {
return nil
}
return &StorageEntry{
Key: e.Key,
Value: e.Value,
SealWrap: e.SealWrap,
}
}
func ProtoStorageEntryToLogicalStorageEntry(e *StorageEntry) *logical.StorageEntry {
if e == nil {
return nil
}
return &logical.StorageEntry{
Key: e.Key,
Value: e.Value,
SealWrap: e.SealWrap,
}
}
func ProtoLeaseOptionsToLogicalLeaseOptions(l *LeaseOptions) (logical.LeaseOptions, error) {
if l == nil {
return logical.LeaseOptions{}, nil
}
t, err := ptypes.Timestamp(l.IssueTime)
return logical.LeaseOptions{
TTL: time.Duration(l.TTL),
Renewable: l.Renewable,
Increment: time.Duration(l.Increment),
IssueTime: t,
}, err
}
func LogicalLeaseOptionsToProtoLeaseOptions(l logical.LeaseOptions) (*LeaseOptions, error) {
t, err := ptypes.TimestampProto(l.IssueTime)
if err != nil {
return nil, err
}
return &LeaseOptions{
TTL: int64(l.TTL),
Renewable: l.Renewable,
Increment: int64(l.Increment),
IssueTime: t,
}, err
}
func ProtoSecretToLogicalSecret(s *Secret) (*logical.Secret, error) {
if s == nil {
return nil, nil
}
data := map[string]interface{}{}
err := json.Unmarshal(s.InternalData, &data)
if err != nil {
return nil, err
}
lease, err := ProtoLeaseOptionsToLogicalLeaseOptions(s.LeaseOptions)
if err != nil {
return nil, err
}
return &logical.Secret{
LeaseOptions: lease,
InternalData: data,
LeaseID: s.LeaseID,
}, nil
}
func LogicalSecretToProtoSecret(s *logical.Secret) (*Secret, error) {
if s == nil {
return nil, nil
}
buf, err := json.Marshal(s.InternalData)
if err != nil {
return nil, err
}
lease, err := LogicalLeaseOptionsToProtoLeaseOptions(s.LeaseOptions)
if err != nil {
return nil, err
}
return &Secret{
LeaseOptions: lease,
InternalData: buf,
LeaseID: s.LeaseID,
}, err
}
func LogicalRequestToProtoRequest(r *logical.Request) (*Request, error) {
if r == nil {
return nil, nil
}
buf, err := json.Marshal(r.Data)
if err != nil {
return nil, err
}
secret, err := LogicalSecretToProtoSecret(r.Secret)
if err != nil {
return nil, err
}
auth, err := LogicalAuthToProtoAuth(r.Auth)
if err != nil {
return nil, err
}
headers := map[string]*Header{}
for k, v := range r.Headers {
headers[k] = &Header{v}
}
return &Request{
ID: r.ID,
ReplicationCluster: r.ReplicationCluster,
Operation: string(r.Operation),
Path: r.Path,
Data: buf,
Secret: secret,
Auth: auth,
Headers: headers,
ClientToken: r.ClientToken,
ClientTokenAccessor: r.ClientTokenAccessor,
DisplayName: r.DisplayName,
MountPoint: r.MountPoint,
MountType: r.MountType,
MountAccessor: r.MountAccessor,
WrapInfo: LogicalRequestWrapInfoToProtoRequestWrapInfo(r.WrapInfo),
ClientTokenRemainingUses: int64(r.ClientTokenRemainingUses),
//MFACreds: MFACreds,
EntityID: r.EntityID,
PolicyOverride: r.PolicyOverride,
Unauthenticated: r.Unauthenticated,
}, nil
}
func ProtoRequestToLogicalRequest(r *Request) (*logical.Request, error) {
if r == nil {
return nil, nil
}
data := map[string]interface{}{}
err := json.Unmarshal(r.Data, &data)
if err != nil {
return nil, err
}
secret, err := ProtoSecretToLogicalSecret(r.Secret)
if err != nil {
return nil, err
}
auth, err := ProtoAuthToLogicalAuth(r.Auth)
if err != nil {
return nil, err
}
var headers map[string][]string
if len(r.Headers) > 0 {
headers = make(map[string][]string, len(r.Headers))
for k, v := range r.Headers {
headers[k] = v.Header
}
}
return &logical.Request{
ID: r.ID,
ReplicationCluster: r.ReplicationCluster,
Operation: logical.Operation(r.Operation),
Path: r.Path,
Data: data,
Secret: secret,
Auth: auth,
Headers: headers,
ClientToken: r.ClientToken,
ClientTokenAccessor: r.ClientTokenAccessor,
DisplayName: r.DisplayName,
MountPoint: r.MountPoint,
MountType: r.MountType,
MountAccessor: r.MountAccessor,
WrapInfo: ProtoRequestWrapInfoToLogicalRequestWrapInfo(r.WrapInfo),
ClientTokenRemainingUses: int(r.ClientTokenRemainingUses),
//MFACreds: MFACreds,
EntityID: r.EntityID,
PolicyOverride: r.PolicyOverride,
Unauthenticated: r.Unauthenticated,
}, nil
}
func LogicalRequestWrapInfoToProtoRequestWrapInfo(i *logical.RequestWrapInfo) *RequestWrapInfo {
if i == nil {
return nil
}
return &RequestWrapInfo{
TTL: int64(i.TTL),
Format: i.Format,
SealWrap: i.SealWrap,
}
}
func ProtoRequestWrapInfoToLogicalRequestWrapInfo(i *RequestWrapInfo) *logical.RequestWrapInfo {
if i == nil {
return nil
}
return &logical.RequestWrapInfo{
TTL: time.Duration(i.TTL),
Format: i.Format,
SealWrap: i.SealWrap,
}
}
func ProtoResponseToLogicalResponse(r *Response) (*logical.Response, error) {
if r == nil {
return nil, nil
}
secret, err := ProtoSecretToLogicalSecret(r.Secret)
if err != nil {
return nil, err
}
auth, err := ProtoAuthToLogicalAuth(r.Auth)
if err != nil {
return nil, err
}
data := map[string]interface{}{}
err = json.Unmarshal(r.Data, &data)
if err != nil {
return nil, err
}
wrapInfo, err := ProtoResponseWrapInfoToLogicalResponseWrapInfo(r.WrapInfo)
if err != nil {
return nil, err
}
return &logical.Response{
Secret: secret,
Auth: auth,
Data: data,
Redirect: r.Redirect,
Warnings: r.Warnings,
WrapInfo: wrapInfo,
}, nil
}
func ProtoResponseWrapInfoToLogicalResponseWrapInfo(i *ResponseWrapInfo) (*wrapping.ResponseWrapInfo, error) {
if i == nil {
return nil, nil
}
t, err := ptypes.Timestamp(i.CreationTime)
if err != nil {
return nil, err
}
return &wrapping.ResponseWrapInfo{
TTL: time.Duration(i.TTL),
Token: i.Token,
Accessor: i.Accessor,
CreationTime: t,
WrappedAccessor: i.WrappedAccessor,
WrappedEntityID: i.WrappedEntityID,
Format: i.Format,
CreationPath: i.CreationPath,
SealWrap: i.SealWrap,
}, nil
}
func LogicalResponseWrapInfoToProtoResponseWrapInfo(i *wrapping.ResponseWrapInfo) (*ResponseWrapInfo, error) {
if i == nil {
return nil, nil
}
t, err := ptypes.TimestampProto(i.CreationTime)
if err != nil {
return nil, err
}
return &ResponseWrapInfo{
TTL: int64(i.TTL),
Token: i.Token,
Accessor: i.Accessor,
CreationTime: t,
WrappedAccessor: i.WrappedAccessor,
WrappedEntityID: i.WrappedEntityID,
Format: i.Format,
CreationPath: i.CreationPath,
SealWrap: i.SealWrap,
}, nil
}
func LogicalResponseToProtoResponse(r *logical.Response) (*Response, error) {
if r == nil {
return nil, nil
}
secret, err := LogicalSecretToProtoSecret(r.Secret)
if err != nil {
return nil, err
}
auth, err := LogicalAuthToProtoAuth(r.Auth)
if err != nil {
return nil, err
}
buf, err := json.Marshal(r.Data)
if err != nil {
return nil, err
}
wrapInfo, err := LogicalResponseWrapInfoToProtoResponseWrapInfo(r.WrapInfo)
if err != nil {
return nil, err
}
return &Response{
Secret: secret,
Auth: auth,
Data: buf,
Redirect: r.Redirect,
Warnings: r.Warnings,
WrapInfo: wrapInfo,
}, nil
}
func LogicalAliasToProtoAlias(a *logical.Alias) *Alias {
if a == nil {
return nil
}
return &Alias{
MountType: a.MountType,
MountAccessor: a.MountAccessor,
Name: a.Name,
}
}
func ProtoAliasToLogicalAlias(a *Alias) *logical.Alias {
if a == nil {
return nil
}
return &logical.Alias{
MountType: a.MountType,
MountAccessor: a.MountAccessor,
Name: a.Name,
}
}
func LogicalAuthToProtoAuth(a *logical.Auth) (*Auth, error) {
if a == nil {
return nil, nil
}
buf, err := json.Marshal(a.InternalData)
if err != nil {
return nil, err
}
groupAliases := make([]*Alias, len(a.GroupAliases))
for i, al := range a.GroupAliases {
groupAliases[i] = LogicalAliasToProtoAlias(al)
}
lo, err := LogicalLeaseOptionsToProtoLeaseOptions(a.LeaseOptions)
if err != nil {
return nil, err
}
return &Auth{
LeaseOptions: lo,
InternalData: buf,
DisplayName: a.DisplayName,
Policies: a.Policies,
Metadata: a.Metadata,
ClientToken: a.ClientToken,
Accessor: a.Accessor,
Period: int64(a.Period),
NumUses: int64(a.NumUses),
EntityID: a.EntityID,
Alias: LogicalAliasToProtoAlias(a.Alias),
GroupAliases: groupAliases,
}, nil
}
func ProtoAuthToLogicalAuth(a *Auth) (*logical.Auth, error) {
if a == nil {
return nil, nil
}
data := map[string]interface{}{}
err := json.Unmarshal(a.InternalData, &data)
if err != nil {
return nil, err
}
groupAliases := make([]*logical.Alias, len(a.GroupAliases))
for i, al := range a.GroupAliases {
groupAliases[i] = ProtoAliasToLogicalAlias(al)
}
lo, err := ProtoLeaseOptionsToLogicalLeaseOptions(a.LeaseOptions)
if err != nil {
return nil, err
}
return &logical.Auth{
LeaseOptions: lo,
InternalData: data,
DisplayName: a.DisplayName,
Policies: a.Policies,
Metadata: a.Metadata,
ClientToken: a.ClientToken,
Accessor: a.Accessor,
Period: time.Duration(a.Period),
NumUses: int(a.NumUses),
EntityID: a.EntityID,
Alias: ProtoAliasToLogicalAlias(a.Alias),
GroupAliases: groupAliases,
}, nil
}

View File

@@ -0,0 +1,262 @@
package pb
import (
"errors"
"reflect"
"testing"
"time"
"github.com/hashicorp/vault/helper/errutil"
"github.com/hashicorp/vault/helper/wrapping"
"github.com/hashicorp/vault/logical"
)
func TestTranslation_Errors(t *testing.T) {
errs := []error{
nil,
errors.New("test"),
errutil.UserError{Err: "test"},
errutil.InternalError{Err: "test"},
logical.CodedError(403, "test"),
&logical.StatusBadRequest{Err: "test"},
logical.ErrUnsupportedOperation,
logical.ErrUnsupportedPath,
logical.ErrInvalidRequest,
logical.ErrPermissionDenied,
logical.ErrMultiAuthzPending,
}
for _, err := range errs {
pe := ErrToProtoErr(err)
e := ProtoErrToErr(pe)
if !reflect.DeepEqual(e, err) {
t.Fatal("Errs did not match: %#v, %#v", e, err)
}
}
}
func TestTranslation_StorageEntry(t *testing.T) {
tCases := []*logical.StorageEntry{
nil,
&logical.StorageEntry{Key: "key", Value: []byte("value")},
&logical.StorageEntry{Key: "key1", Value: []byte("value1"), SealWrap: true},
&logical.StorageEntry{Key: "key1", SealWrap: true},
}
for _, c := range tCases {
p := LogicalStorageEntryToProtoStorageEntry(c)
e := ProtoStorageEntryToLogicalStorageEntry(p)
if !reflect.DeepEqual(c, e) {
t.Fatal("Entries did not match: %#v, %#v", e, c)
}
}
}
func TestTranslation_Request(t *testing.T) {
tCases := []*logical.Request{
nil,
&logical.Request{
ID: "ID",
ReplicationCluster: "RID",
Operation: logical.CreateOperation,
Path: "test/foo",
ClientToken: "token",
ClientTokenAccessor: "accessor",
DisplayName: "display",
MountPoint: "test",
MountType: "secret",
MountAccessor: "test-231234",
ClientTokenRemainingUses: 1,
EntityID: "tester",
PolicyOverride: true,
Unauthenticated: true,
},
&logical.Request{
ID: "ID",
ReplicationCluster: "RID",
Operation: logical.CreateOperation,
Path: "test/foo",
Data: map[string]interface{}{
"string": "string",
"bool": true,
"array": []interface{}{"1", "2"},
"map": map[string]interface{}{
"key": "value",
},
},
Secret: &logical.Secret{
LeaseOptions: logical.LeaseOptions{
TTL: time.Second,
Renewable: true,
Increment: time.Second,
IssueTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
InternalData: map[string]interface{}{
"role": "test",
},
LeaseID: "LeaseID",
},
Auth: &logical.Auth{
LeaseOptions: logical.LeaseOptions{
TTL: time.Second,
Renewable: true,
Increment: time.Second,
IssueTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
InternalData: map[string]interface{}{
"role": "test",
},
DisplayName: "test",
Policies: []string{"test", "Test"},
Metadata: map[string]string{
"test": "test",
},
ClientToken: "token",
Accessor: "accessor",
Period: 5 * time.Second,
NumUses: 1,
EntityID: "id",
Alias: &logical.Alias{
MountType: "type",
MountAccessor: "accessor",
Name: "name",
},
GroupAliases: []*logical.Alias{
&logical.Alias{
MountType: "type",
MountAccessor: "accessor",
Name: "name",
},
},
},
Headers: map[string][]string{
"X-Vault-Test": []string{"test"},
},
ClientToken: "token",
ClientTokenAccessor: "accessor",
DisplayName: "display",
MountPoint: "test",
MountType: "secret",
MountAccessor: "test-231234",
WrapInfo: &logical.RequestWrapInfo{
TTL: time.Second,
Format: "token",
SealWrap: true,
},
ClientTokenRemainingUses: 1,
EntityID: "tester",
PolicyOverride: true,
Unauthenticated: true,
},
}
for _, c := range tCases {
p, err := LogicalRequestToProtoRequest(c)
if err != nil {
t.Fatal(err)
}
r, err := ProtoRequestToLogicalRequest(p)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(c, r) {
t.Fatalf("Requests did not match: \n%#v, \n%#v", c, r)
}
}
}
func TestTranslation_Response(t *testing.T) {
tCases := []*logical.Response{
nil,
&logical.Response{
Data: map[string]interface{}{
"data": "blah",
},
Warnings: []string{"warning"},
},
&logical.Response{
Data: map[string]interface{}{
"string": "string",
"bool": true,
"array": []interface{}{"1", "2"},
"map": map[string]interface{}{
"key": "value",
},
},
Secret: &logical.Secret{
LeaseOptions: logical.LeaseOptions{
TTL: time.Second,
Renewable: true,
Increment: time.Second,
IssueTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
InternalData: map[string]interface{}{
"role": "test",
},
LeaseID: "LeaseID",
},
Auth: &logical.Auth{
LeaseOptions: logical.LeaseOptions{
TTL: time.Second,
Renewable: true,
Increment: time.Second,
IssueTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
InternalData: map[string]interface{}{
"role": "test",
},
DisplayName: "test",
Policies: []string{"test", "Test"},
Metadata: map[string]string{
"test": "test",
},
ClientToken: "token",
Accessor: "accessor",
Period: 5 * time.Second,
NumUses: 1,
EntityID: "id",
Alias: &logical.Alias{
MountType: "type",
MountAccessor: "accessor",
Name: "name",
},
GroupAliases: []*logical.Alias{
&logical.Alias{
MountType: "type",
MountAccessor: "accessor",
Name: "name",
},
},
},
WrapInfo: &wrapping.ResponseWrapInfo{
TTL: time.Second,
Token: "token",
Accessor: "accessor",
CreationTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
WrappedAccessor: "wrapped-accessor",
WrappedEntityID: "id",
Format: "token",
CreationPath: "test/foo",
SealWrap: true,
},
},
}
for _, c := range tCases {
p, err := LogicalResponseToProtoResponse(c)
if err != nil {
t.Fatal(err)
}
r, err := ProtoResponseToLogicalResponse(p)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(c, r) {
t.Fatalf("Requests did not match: \n%#v, \n%#v", c, r)
}
}
}

View File

@@ -4,6 +4,7 @@ import (
"crypto/ecdsa"
"crypto/rsa"
"encoding/gob"
"errors"
"fmt"
"time"
@@ -44,13 +45,13 @@ type BackendPluginClient struct {
client *plugin.Client
sync.Mutex
*backendPluginClient
logical.Backend
}
// Cleanup calls the RPC client's Cleanup() func and also calls
// the go-plugin's client Kill() func
func (b *BackendPluginClient) Cleanup() {
b.backendPluginClient.Cleanup()
b.Backend.Cleanup()
b.client.Kill()
}
@@ -122,13 +123,34 @@ func newPluginClient(sys pluginutil.RunnerUtil, pluginRunner *pluginutil.PluginR
return nil, err
}
var backend logical.Backend
var transport string
// We should have a logical backend type now. This feels like a normal interface
// implementation but is in fact over an RPC connection.
backendRPC := raw.(*backendPluginClient)
switch raw.(type) {
case *backendPluginClient:
backend = raw.(*backendPluginClient)
transport = "netRPC"
case *backendGRPCPluginClient:
backend = raw.(*backendGRPCPluginClient)
transport = "gRPC"
default:
return nil, errors.New("Unsupported plugin client type")
}
// Wrap the backend in a tracing middleware
if logger.IsTrace() {
backend = &backendTracingMiddleware{
logger: logger,
transport: transport,
typeStr: pluginRunner.Name,
next: backend,
}
}
return &BackendPluginClient{
client: client,
backendPluginClient: backendRPC,
client: client,
Backend: backend,
}, nil
}

View File

@@ -2,7 +2,9 @@ package plugin
import (
"crypto/tls"
"os"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/helper/pluginutil"
"github.com/hashicorp/vault/logical"
@@ -18,15 +20,26 @@ type TLSProdiverFunc func() (*tls.Config, error)
type ServeOpts struct {
BackendFactoryFunc BackendFactoryFunc
TLSProviderFunc TLSProdiverFunc
Logger hclog.Logger
}
// Serve is a helper function used to serve a backend plugin. This
// should be ran on the plugin's main process.
func Serve(opts *ServeOpts) error {
logger := opts.Logger
if logger == nil {
logger = hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: os.Stderr,
JSONFormat: true,
})
}
// pluginMap is the map of plugins we can dispense.
var pluginMap = map[string]plugin.Plugin{
"backend": &BackendPlugin{
Factory: opts.BackendFactoryFunc,
Logger: logger,
},
}
@@ -40,6 +53,10 @@ func Serve(opts *ServeOpts) error {
HandshakeConfig: handshakeConfig,
Plugins: pluginMap,
TLSProvider: opts.TLSProviderFunc,
Logger: logger,
// A non-nil value here enables gRPC serving for this plugin...
GRPCServer: plugin.DefaultGRPCServer,
})
return nil

View File

@@ -3,15 +3,18 @@ package plugin
import (
"testing"
"google.golang.org/grpc"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/plugin/pb"
)
func TestStorage_impl(t *testing.T) {
var _ logical.Storage = new(StorageClient)
}
func TestStorage_operations(t *testing.T) {
func TestStorage_RPC(t *testing.T) {
client, server := plugin.TestRPCConn(t)
defer client.Close()
@@ -25,3 +28,18 @@ func TestStorage_operations(t *testing.T) {
logical.TestStorage(t, testStorage)
}
func TestStorage_GRPC(t *testing.T) {
storage := &logical.InmemStorage{}
client, _ := plugin.TestGRPCConn(t, func(s *grpc.Server) {
pb.RegisterStorageServer(s, &GRPCStorageServer{
impl: storage,
})
})
defer client.Close()
testStorage := &GRPCStorageClient{client: pb.NewStorageClient(client)}
logical.TestStorage(t, testStorage)
}

View File

@@ -2,6 +2,7 @@ package plugin
import (
"bufio"
"context"
"crypto/subtle"
"crypto/tls"
"errors"
@@ -79,6 +80,7 @@ type Client struct {
client ClientProtocol
protocol Protocol
logger hclog.Logger
doneCtx context.Context
}
// ClientConfig is the configuration used to initialize a new
@@ -310,7 +312,7 @@ func (c *Client) Client() (ClientProtocol, error) {
c.client, err = newRPCClient(c)
case ProtocolGRPC:
c.client, err = newGRPCClient(c)
c.client, err = newGRPCClient(c.doneCtx, c)
default:
return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
@@ -423,6 +425,9 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Create the logging channel for when we kill
c.doneLogging = make(chan struct{})
// Create a context for when we kill
var ctxCancel context.CancelFunc
c.doneCtx, ctxCancel = context.WithCancel(context.Background())
if c.config.Reattach != nil {
// Verify the process still exists. If not, then it is an error
@@ -457,6 +462,8 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Close the logging channel since that doesn't work on reattach
close(c.doneLogging)
// Cancel the context
ctxCancel()
}(p.Pid)
// Set the address and process
@@ -534,6 +541,8 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Mark that we exited
close(exitCh)
// Cancel the context, marking that we exited
ctxCancel()
// Set that we exited, which takes a lock
c.l.Lock()
@@ -707,18 +716,29 @@ func (c *Client) Protocol() Protocol {
return c.protocol
}
func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) {
return func(_ string, _ time.Duration) (net.Conn, error) {
// Connect to the client
conn, err := net.Dial(addr.Network(), addr.String())
if err != nil {
return nil, err
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
// Make sure to set keep alive so that the connection doesn't die
tcpConn.SetKeepAlive(true)
}
return conn, nil
}
}
// dialer is compatible with grpc.WithDialer and creates the connection
// to the plugin.
func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
// Connect to the client
conn, err := net.Dial(c.address.Network(), c.address.String())
conn, err := netAddrDialer(c.address)("", timeout)
if err != nil {
return nil, err
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
// Make sure to set keep alive so that the connection doesn't die
tcpConn.SetKeepAlive(true)
}
// If we have a TLS config we wrap our connection. We only do this
// for net/rpc since gRPC uses its own mechanism for TLS.

455
vendor/github.com/hashicorp/go-plugin/grpc_broker.go generated vendored Normal file
View File

@@ -0,0 +1,455 @@
package plugin
import (
"context"
"crypto/tls"
"errors"
"fmt"
"log"
"net"
"sync"
"sync/atomic"
"time"
"github.com/oklog/run"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// streamer interface is used in the broker to send/receive connection
// information.
type streamer interface {
Send(*ConnInfo) error
Recv() (*ConnInfo, error)
Close()
}
// sendErr is used to pass errors back during a send.
type sendErr struct {
i *ConnInfo
ch chan error
}
// gRPCBrokerServer is used by the plugin to start a stream and to send
// connection information to/from the plugin. Implements GRPCBrokerServer and
// streamer interfaces.
type gRPCBrokerServer struct {
// send is used to send connection info to the gRPC stream.
send chan *sendErr
// recv is used to receive connection info from the gRPC stream.
recv chan *ConnInfo
// quit closes down the stream.
quit chan struct{}
// o is used to ensure we close the quit channel only once.
o sync.Once
}
func newGRPCBrokerServer() *gRPCBrokerServer {
return &gRPCBrokerServer{
send: make(chan *sendErr),
recv: make(chan *ConnInfo),
quit: make(chan struct{}),
}
}
// StartStream implements the GRPCBrokerServer interface and will block until
// the quit channel is closed or the context reports Done. The stream will pass
// connection information to/from the client.
func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
doneCh := stream.Context().Done()
defer s.Close()
// Proccess send stream
go func() {
for {
select {
case <-doneCh:
return
case <-s.quit:
return
case se := <-s.send:
err := stream.Send(se.i)
se.ch <- err
}
}
}()
// Process receive stream
for {
i, err := stream.Recv()
if err != nil {
return err
}
select {
case <-doneCh:
return nil
case <-s.quit:
return nil
case s.recv <- i:
}
}
return nil
}
// Send is used by the GRPCBroker to pass connection information into the stream
// to the client.
func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
ch := make(chan error)
defer close(ch)
select {
case <-s.quit:
return errors.New("broker closed")
case s.send <- &sendErr{
i: i,
ch: ch,
}:
}
return <-ch
}
// Recv is used by the GRPCBroker to pass connection information that has been
// sent from the client from the stream to the broker.
func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
select {
case <-s.quit:
return nil, errors.New("broker closed")
case i := <-s.recv:
return i, nil
}
}
// Close closes the quit channel, shutting down the stream.
func (s *gRPCBrokerServer) Close() {
s.o.Do(func() {
close(s.quit)
})
}
// gRPCBrokerClientImpl is used by the client to start a stream and to send
// connection information to/from the client. Implements GRPCBrokerClient and
// streamer interfaces.
type gRPCBrokerClientImpl struct {
// client is the underlying GRPC client used to make calls to the server.
client GRPCBrokerClient
// send is used to send connection info to the gRPC stream.
send chan *sendErr
// recv is used to receive connection info from the gRPC stream.
recv chan *ConnInfo
// quit closes down the stream.
quit chan struct{}
// o is used to ensure we close the quit channel only once.
o sync.Once
}
func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
return &gRPCBrokerClientImpl{
client: NewGRPCBrokerClient(conn),
send: make(chan *sendErr),
recv: make(chan *ConnInfo),
quit: make(chan struct{}),
}
}
// StartStream implements the GRPCBrokerClient interface and will block until
// the quit channel is closed or the context reports Done. The stream will pass
// connection information to/from the plugin.
func (s *gRPCBrokerClientImpl) StartStream() error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
defer s.Close()
stream, err := s.client.StartStream(ctx)
if err != nil {
return err
}
doneCh := stream.Context().Done()
go func() {
for {
select {
case <-doneCh:
return
case <-s.quit:
return
case se := <-s.send:
err := stream.Send(se.i)
se.ch <- err
}
}
}()
for {
i, err := stream.Recv()
if err != nil {
return err
}
select {
case <-doneCh:
return nil
case <-s.quit:
return nil
case s.recv <- i:
}
}
return nil
}
// Send is used by the GRPCBroker to pass connection information into the stream
// to the plugin.
func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
ch := make(chan error)
defer close(ch)
select {
case <-s.quit:
return errors.New("broker closed")
case s.send <- &sendErr{
i: i,
ch: ch,
}:
}
return <-ch
}
// Recv is used by the GRPCBroker to pass connection information that has been
// sent from the plugin to the broker.
func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
select {
case <-s.quit:
return nil, errors.New("broker closed")
case i := <-s.recv:
return i, nil
}
}
// Close closes the quit channel, shutting down the stream.
func (s *gRPCBrokerClientImpl) Close() {
s.o.Do(func() {
close(s.quit)
})
}
// GRPCBroker is responsible for brokering connections by unique ID.
//
// It is used by plugins to create multiple gRPC connections and data
// streams between the plugin process and the host process.
//
// This allows a plugin to request a channel with a specific ID to connect to
// or accept a connection from, and the broker handles the details of
// holding these channels open while they're being negotiated.
//
// The Plugin interface has access to these for both Server and Client.
// The broker can be used by either (optionally) to reserve and connect to
// new streams. This is useful for complex args and return values,
// or anything else you might need a data stream for.
type GRPCBroker struct {
nextId uint32
streamer streamer
streams map[uint32]*gRPCBrokerPending
tls *tls.Config
doneCh chan struct{}
o sync.Once
sync.Mutex
}
type gRPCBrokerPending struct {
ch chan *ConnInfo
doneCh chan struct{}
}
func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
return &GRPCBroker{
streamer: s,
streams: make(map[uint32]*gRPCBrokerPending),
tls: tls,
doneCh: make(chan struct{}),
}
}
// Accept accepts a connection by ID.
//
// This should not be called multiple times with the same ID at one time.
func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
listener, err := serverListener()
if err != nil {
return nil, err
}
err = b.streamer.Send(&ConnInfo{
ServiceId: id,
Network: listener.Addr().Network(),
Address: listener.Addr().String(),
})
if err != nil {
return nil, err
}
return listener, nil
}
// AcceptAndServe is used to accept a specific stream ID and immediately
// serve a gRPC server on that stream ID. This is used to easily serve
// complex arguments. Each AcceptAndServe call opens a new listener socket and
// sends the connection info down the stream to the dialer. Since a new
// connection is opened every call, these calls should be used sparingly.
// Multiple gRPC server implementations can be registered to a single
// AcceptAndServe call.
func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
listener, err := b.Accept(id)
if err != nil {
log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
return
}
defer listener.Close()
var opts []grpc.ServerOption
if b.tls != nil {
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
}
server := s(opts)
// Here we use a run group to close this goroutine if the server is shutdown
// or the broker is shutdown.
var g run.Group
{
// Serve on the listener, if shutting down call GracefulStop.
g.Add(func() error {
return server.Serve(listener)
}, func(err error) {
server.GracefulStop()
})
}
{
// block on the closeCh or the doneCh. If we are shutting down close the
// closeCh.
closeCh := make(chan struct{})
g.Add(func() error {
select {
case <-b.doneCh:
case <-closeCh:
}
return nil
}, func(err error) {
close(closeCh)
})
}
// Block until we are done
g.Run()
}
// Close closes the stream and all servers.
func (b *GRPCBroker) Close() error {
b.streamer.Close()
b.o.Do(func() {
close(b.doneCh)
})
return nil
}
// Dial opens a connection by ID.
func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
var c *ConnInfo
// Open the stream
p := b.getStream(id)
select {
case c = <-p.ch:
close(p.doneCh)
case <-time.After(5 * time.Second):
return nil, fmt.Errorf("timeout waiting for connection info")
}
var addr net.Addr
switch c.Network {
case "tcp":
addr, err = net.ResolveTCPAddr("tcp", c.Address)
case "unix":
addr, err = net.ResolveUnixAddr("unix", c.Address)
default:
err = fmt.Errorf("Unknown address type: %s", c.Address)
}
if err != nil {
return nil, err
}
return dialGRPCConn(b.tls, netAddrDialer(addr))
}
// NextId returns a unique ID to use next.
//
// It is possible for very long-running plugin hosts to wrap this value,
// though it would require a very large amount of calls. In practice
// we've never seen it happen.
func (m *GRPCBroker) NextId() uint32 {
return atomic.AddUint32(&m.nextId, 1)
}
// Run starts the brokering and should be executed in a goroutine, since it
// blocks forever, or until the session closes.
//
// Uses of GRPCBroker never need to call this. It is called internally by
// the plugin host/client.
func (m *GRPCBroker) Run() {
for {
stream, err := m.streamer.Recv()
if err != nil {
// Once we receive an error, just exit
break
}
// Initialize the waiter
p := m.getStream(stream.ServiceId)
select {
case p.ch <- stream:
default:
}
go m.timeoutWait(stream.ServiceId, p)
}
}
func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
m.Lock()
defer m.Unlock()
p, ok := m.streams[id]
if ok {
return p
}
m.streams[id] = &gRPCBrokerPending{
ch: make(chan *ConnInfo, 1),
doneCh: make(chan struct{}),
}
return m.streams[id]
}
func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
// Wait for the stream to either be picked up and connected, or
// for a timeout.
select {
case <-p.doneCh:
case <-time.After(5 * time.Second):
}
m.Lock()
defer m.Unlock()
// Delete the stream so no one else can grab it
delete(m.streams, id)
}

190
vendor/github.com/hashicorp/go-plugin/grpc_broker.pb.go generated vendored Normal file
View File

@@ -0,0 +1,190 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: grpc_broker.proto
/*
Package plugin is a generated protocol buffer package.
It is generated from these files:
grpc_broker.proto
It has these top-level messages:
ConnInfo
*/
package plugin
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type ConnInfo struct {
ServiceId uint32 `protobuf:"varint,1,opt,name=service_id,json=serviceId" json:"service_id,omitempty"`
Network string `protobuf:"bytes,2,opt,name=network" json:"network,omitempty"`
Address string `protobuf:"bytes,3,opt,name=address" json:"address,omitempty"`
}
func (m *ConnInfo) Reset() { *m = ConnInfo{} }
func (m *ConnInfo) String() string { return proto.CompactTextString(m) }
func (*ConnInfo) ProtoMessage() {}
func (*ConnInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *ConnInfo) GetServiceId() uint32 {
if m != nil {
return m.ServiceId
}
return 0
}
func (m *ConnInfo) GetNetwork() string {
if m != nil {
return m.Network
}
return ""
}
func (m *ConnInfo) GetAddress() string {
if m != nil {
return m.Address
}
return ""
}
func init() {
proto.RegisterType((*ConnInfo)(nil), "plugin.ConnInfo")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for GRPCBroker service
type GRPCBrokerClient interface {
StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error)
}
type gRPCBrokerClient struct {
cc *grpc.ClientConn
}
func NewGRPCBrokerClient(cc *grpc.ClientConn) GRPCBrokerClient {
return &gRPCBrokerClient{cc}
}
func (c *gRPCBrokerClient) StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error) {
stream, err := grpc.NewClientStream(ctx, &_GRPCBroker_serviceDesc.Streams[0], c.cc, "/plugin.GRPCBroker/StartStream", opts...)
if err != nil {
return nil, err
}
x := &gRPCBrokerStartStreamClient{stream}
return x, nil
}
type GRPCBroker_StartStreamClient interface {
Send(*ConnInfo) error
Recv() (*ConnInfo, error)
grpc.ClientStream
}
type gRPCBrokerStartStreamClient struct {
grpc.ClientStream
}
func (x *gRPCBrokerStartStreamClient) Send(m *ConnInfo) error {
return x.ClientStream.SendMsg(m)
}
func (x *gRPCBrokerStartStreamClient) Recv() (*ConnInfo, error) {
m := new(ConnInfo)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for GRPCBroker service
type GRPCBrokerServer interface {
StartStream(GRPCBroker_StartStreamServer) error
}
func RegisterGRPCBrokerServer(s *grpc.Server, srv GRPCBrokerServer) {
s.RegisterService(&_GRPCBroker_serviceDesc, srv)
}
func _GRPCBroker_StartStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(GRPCBrokerServer).StartStream(&gRPCBrokerStartStreamServer{stream})
}
type GRPCBroker_StartStreamServer interface {
Send(*ConnInfo) error
Recv() (*ConnInfo, error)
grpc.ServerStream
}
type gRPCBrokerStartStreamServer struct {
grpc.ServerStream
}
func (x *gRPCBrokerStartStreamServer) Send(m *ConnInfo) error {
return x.ServerStream.SendMsg(m)
}
func (x *gRPCBrokerStartStreamServer) Recv() (*ConnInfo, error) {
m := new(ConnInfo)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _GRPCBroker_serviceDesc = grpc.ServiceDesc{
ServiceName: "plugin.GRPCBroker",
HandlerType: (*GRPCBrokerServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "StartStream",
Handler: _GRPCBroker_StartStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "grpc_broker.proto",
}
func init() { proto.RegisterFile("grpc_broker.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 170 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2f, 0x2a, 0x48,
0x8e, 0x4f, 0x2a, 0xca, 0xcf, 0x4e, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2b,
0xc8, 0x29, 0x4d, 0xcf, 0xcc, 0x53, 0x8a, 0xe5, 0xe2, 0x70, 0xce, 0xcf, 0xcb, 0xf3, 0xcc, 0x4b,
0xcb, 0x17, 0x92, 0xe5, 0xe2, 0x2a, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x8d, 0xcf, 0x4c, 0x91,
0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0d, 0xe2, 0x84, 0x8a, 0x78, 0xa6, 0x08, 0x49, 0x70, 0xb1, 0xe7,
0xa5, 0x96, 0x94, 0xe7, 0x17, 0x65, 0x4b, 0x30, 0x29, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xb8, 0x20,
0x99, 0xc4, 0x94, 0x94, 0xa2, 0xd4, 0xe2, 0x62, 0x09, 0x66, 0x88, 0x0c, 0x94, 0x6b, 0xe4, 0xcc,
0xc5, 0xe5, 0x1e, 0x14, 0xe0, 0xec, 0x04, 0xb6, 0x5a, 0xc8, 0x94, 0x8b, 0x3b, 0xb8, 0x24, 0xb1,
0xa8, 0x24, 0xb8, 0xa4, 0x28, 0x35, 0x31, 0x57, 0x48, 0x40, 0x0f, 0xe2, 0x08, 0x3d, 0x98, 0x0b,
0xa4, 0x30, 0x44, 0x34, 0x18, 0x0d, 0x18, 0x93, 0xd8, 0xc0, 0x4e, 0x36, 0x06, 0x04, 0x00, 0x00,
0xff, 0xff, 0x7b, 0x5d, 0xfb, 0xe1, 0xc7, 0x00, 0x00, 0x00,
}

View File

@@ -0,0 +1,14 @@
syntax = "proto3";
package plugin;
message ConnInfo {
uint32 service_id = 1;
string network = 2;
string address = 3;
}
service GRPCBroker {
rpc StartStream(stream ConnInfo) returns (stream ConnInfo);
}

View File

@@ -1,7 +1,10 @@
package plugin
import (
"crypto/tls"
"fmt"
"net"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
@@ -9,14 +12,12 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
)
// newGRPCClient creates a new GRPCClient. The Client argument is expected
// to be successfully started already with a lock held.
func newGRPCClient(c *Client) (*GRPCClient, error) {
func dialGRPCConn(tls *tls.Config, dialer func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
// Build dialing options.
opts := make([]grpc.DialOption, 0, 5)
// We use a custom dialer so that we can connect over unix domain sockets
opts = append(opts, grpc.WithDialer(c.dialer))
opts = append(opts, grpc.WithDialer(dialer))
// go-plugin expects to block the connection
opts = append(opts, grpc.WithBlock())
@@ -26,11 +27,11 @@ func newGRPCClient(c *Client) (*GRPCClient, error) {
// If we have no TLS configuration set, we need to explicitly tell grpc
// that we're connecting with an insecure connection.
if c.config.TLSConfig == nil {
if tls == nil {
opts = append(opts, grpc.WithInsecure())
} else {
opts = append(opts, grpc.WithTransportCredentials(
credentials.NewTLS(c.config.TLSConfig)))
credentials.NewTLS(tls)))
}
// Connect. Note the first parameter is unused because we use a custom
@@ -40,9 +41,28 @@ func newGRPCClient(c *Client) (*GRPCClient, error) {
return nil, err
}
return conn, nil
}
// newGRPCClient creates a new GRPCClient. The Client argument is expected
// to be successfully started already with a lock held.
func newGRPCClient(doneCtx context.Context, c *Client) (*GRPCClient, error) {
conn, err := dialGRPCConn(c.config.TLSConfig, c.dialer)
if err != nil {
return nil, err
}
// Start the broker.
brokerGRPCClient := newGRPCBrokerClient(conn)
broker := newGRPCBroker(brokerGRPCClient, c.config.TLSConfig)
go broker.Run()
go brokerGRPCClient.StartStream()
return &GRPCClient{
Conn: conn,
Plugins: c.config.Plugins,
doneCtx: doneCtx,
broker: broker,
}, nil
}
@@ -50,10 +70,14 @@ func newGRPCClient(c *Client) (*GRPCClient, error) {
type GRPCClient struct {
Conn *grpc.ClientConn
Plugins map[string]Plugin
doneCtx context.Context
broker *GRPCBroker
}
// ClientProtocol impl.
func (c *GRPCClient) Close() error {
c.broker.Close()
return c.Conn.Close()
}
@@ -69,7 +93,7 @@ func (c *GRPCClient) Dispense(name string) (interface{}, error) {
return nil, fmt.Errorf("plugin %q doesn't support gRPC", name)
}
return p.GRPCClient(c.Conn)
return p.GRPCClient(c.doneCtx, c.broker, c.Conn)
}
// ClientProtocol impl.

View File

@@ -51,6 +51,7 @@ type GRPCServer struct {
config GRPCServerConfig
server *grpc.Server
broker *GRPCBroker
}
// ServerProtocol impl.
@@ -68,6 +69,12 @@ func (s *GRPCServer) Init() error {
GRPCServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(s.server, healthCheck)
// Register the broker service
brokerServer := newGRPCBrokerServer()
RegisterGRPCBrokerServer(s.server, brokerServer)
s.broker = newGRPCBroker(brokerServer, s.TLS)
go s.broker.Run()
// Register all our plugins onto the gRPC server.
for k, raw := range s.Plugins {
p, ok := raw.(GRPCPlugin)
@@ -75,7 +82,7 @@ func (s *GRPCServer) Init() error {
return fmt.Errorf("%q is not a GRPC-compatibile plugin", k)
}
if err := p.GRPCServer(s.server); err != nil {
if err := p.GRPCServer(s.broker, s.server); err != nil {
return fmt.Errorf("error registring %q: %s", k, err)
}
}

View File

@@ -9,6 +9,7 @@
package plugin
import (
"context"
"errors"
"net/rpc"
@@ -33,11 +34,12 @@ type GRPCPlugin interface {
// GRPCServer should register this plugin for serving with the
// given GRPCServer. Unlike Plugin.Server, this is only called once
// since gRPC plugins serve singletons.
GRPCServer(*grpc.Server) error
GRPCServer(*GRPCBroker, *grpc.Server) error
// GRPCClient should return the interface implementation for the plugin
// you're serving via gRPC.
GRPCClient(*grpc.ClientConn) (interface{}, error)
// you're serving via gRPC. The provided context will be canceled by
// go-plugin in the event of the plugin process exiting.
GRPCClient(context.Context, *GRPCBroker, *grpc.ClientConn) (interface{}, error)
}
// NetRPCUnsupportedPlugin implements Plugin but returns errors for the

View File

@@ -2,6 +2,7 @@ package plugin
import (
"bytes"
"context"
"net"
"net/rpc"
@@ -110,10 +111,16 @@ func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCSe
// Connection successful, close the listener
l.Close()
brokerGRPCClient := newGRPCBrokerClient(conn)
broker := newGRPCBroker(brokerGRPCClient, nil)
go broker.Run()
go brokerGRPCClient.StartStream()
// Create the client
client := &GRPCClient{
Conn: conn,
Plugins: ps,
broker: broker,
doneCtx: context.Background(),
}
return client, server

201
vendor/github.com/oklog/run/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

73
vendor/github.com/oklog/run/README.md generated vendored Normal file
View File

@@ -0,0 +1,73 @@
# run
[![GoDoc](https://godoc.org/github.com/oklog/run?status.svg)](https://godoc.org/github.com/oklog/run)
[![Build Status](https://travis-ci.org/oklog/run.svg?branch=master)](https://travis-ci.org/oklog/run)
[![Go Report Card](https://goreportcard.com/badge/github.com/oklog/run)](https://goreportcard.com/report/github.com/oklog/run)
[![Apache 2 licensed](https://img.shields.io/badge/license-Apache2-blue.svg)](https://raw.githubusercontent.com/oklog/run/master/LICENSE)
run.Group is a universal mechanism to manage goroutine lifecycles.
Create a zero-value run.Group, and then add actors to it. Actors are defined as
a pair of functions: an **execute** function, which should run synchronously;
and an **interrupt** function, which, when invoked, should cause the execute
function to return. Finally, invoke Run, which blocks until the first actor
returns. This general-purpose API allows callers to model pretty much any
runnable task, and achieve well-defined lifecycle semantics for the group.
run.Group was written to manage component lifecycles in func main for
[OK Log](https://github.com/oklog/oklog).
But it's useful in any circumstance where you need to orchestrate multiple
goroutines as a unit whole.
[Click here](https://www.youtube.com/watch?v=LHe1Cb_Ud_M&t=15m45s) to see a
video of a talk where run.Group is described.
## Examples
### context.Context
```go
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return myProcess(ctx, ...)
}, func(error) {
cancel()
})
```
### net.Listener
```go
ln, _ := net.Listen("tcp", ":8080")
g.Add(func() error {
return http.Serve(ln, nil)
}, func(error) {
ln.Close()
})
```
### io.ReadCloser
```go
var conn io.ReadCloser = ...
g.Add(func() error {
s := bufio.NewScanner(conn)
for s.Scan() {
println(s.Text())
}
return s.Err()
}, func(error) {
conn.Close()
})
```
## Comparisons
Package run is somewhat similar to package
[errgroup](https://godoc.org/golang.org/x/sync/errgroup),
except it doesn't require actor goroutines to understand context semantics.
It's somewhat similar to package
[tomb.v1](https://godoc.org/gopkg.in/tomb.v1) or
[tomb.v2](https://godoc.org/gopkg.in/tomb.v2),
except it has a much smaller API surface, delegating e.g. staged shutdown of
goroutines to the caller.

62
vendor/github.com/oklog/run/group.go generated vendored Normal file
View File

@@ -0,0 +1,62 @@
// Package run implements an actor-runner with deterministic teardown. It is
// somewhat similar to package errgroup, except it does not require actor
// goroutines to understand context semantics. This makes it suitable for use in
// more circumstances; for example, goroutines which are handling connections
// from net.Listeners, or scanning input from a closable io.Reader.
package run
// Group collects actors (functions) and runs them concurrently.
// When one actor (function) returns, all actors are interrupted.
// The zero value of a Group is useful.
type Group struct {
actors []actor
}
// Add an actor (function) to the group. Each actor must be pre-emptable by an
// interrupt function. That is, if interrupt is invoked, execute should return.
// Also, it must be safe to call interrupt even after execute has returned.
//
// The first actor (function) to return interrupts all running actors.
// The error is passed to the interrupt functions, and is returned by Run.
func (g *Group) Add(execute func() error, interrupt func(error)) {
g.actors = append(g.actors, actor{execute, interrupt})
}
// Run all actors (functions) concurrently.
// When the first actor returns, all others are interrupted.
// Run only returns when all actors have exited.
// Run returns the error returned by the first exiting actor.
func (g *Group) Run() error {
if len(g.actors) == 0 {
return nil
}
// Run each actor.
errors := make(chan error, len(g.actors))
for _, a := range g.actors {
go func(a actor) {
errors <- a.execute()
}(a)
}
// Wait for the first actor to stop.
err := <-errors
// Signal all actors to stop.
for _, a := range g.actors {
a.interrupt(err)
}
// Wait for all actors to stop.
for i := 1; i < cap(errors); i++ {
<-errors
}
// Return the original error.
return err
}
type actor struct {
execute func() error
interrupt func(error)
}

12
vendor/vendor.json vendored
View File

@@ -1021,10 +1021,10 @@
"revisionTime": "2017-06-22T06:09:55Z"
},
{
"checksumSHA1": "QSH/KjHRLljyZDXMmkd9fdqbr3I=",
"checksumSHA1": "fqYiU+mDu70k2aeHdYZrZbCnUjM=",
"path": "github.com/hashicorp/go-plugin",
"revision": "e37881a3f1a07fce82b3d99ce0342a72e53386bc",
"revisionTime": "2018-01-11T18:21:30Z"
"revision": "4b3b29102a1e16e60f22e79904ef3490f210f58d",
"revisionTime": "2018-01-18T02:43:51Z"
},
{
"checksumSHA1": "yzoWV7yrS/TvOrKy5ZrdUjsYaOA=",
@@ -1376,6 +1376,12 @@
"revision": "c95c6e5c2d1a3d37fc44c8c6dc9e231c7500667d",
"revisionTime": "2017-10-19T11:44:56Z"
},
{
"checksumSHA1": "Sfxv8SV6j8m6YD+hwvlMJjq2zfg=",
"path": "github.com/oklog/run",
"revision": "4dadeb3030eda0273a12382bb2348ffc7c9d1a39",
"revisionTime": "2017-11-14T00:29:35Z"
},
{
"checksumSHA1": "OFNit1Qx2DdWhotfREKodDNUwCM=",
"path": "github.com/opencontainers/go-digest",