Database gRPC plugins (#3666)

* Start work on context aware backends

* Start work on moving the database plugins to gRPC in order to pass context

* Add context to builtin database plugins

* use byte slice instead of string

* Context all the things

* Move proto messages to the dbplugin package

* Add a grpc mechanism for running backend plugins

* Serve the GRPC plugin

* Add backwards compatibility to the database plugins

* Remove backend plugin changes

* Remove backend plugin changes

* Cleanup the transport implementations

* If grpc connection is in an unexpected state restart the plugin

* Fix tests

* Fix tests

* Remove context from the request object, replace it with context.TODO

* Add a test to verify netRPC plugins still work

* Remove unused mapstructure call

* Code review fixes

* Code review fixes

* Code review fixes
This commit is contained in:
Brian Kassouf
2017-12-14 14:03:11 -08:00
committed by GitHub
parent 2ce0984e00
commit a401cc7cb5
35 changed files with 1425 additions and 390 deletions

View File

@@ -1,6 +1,7 @@
package cassandra
import (
"context"
"strings"
"time"
@@ -21,6 +22,8 @@ const (
cassandraTypeName = "cassandra"
)
var _ dbplugin.Database = &Cassandra{}
// Cassandra is an implementation of Database interface
type Cassandra struct {
connutil.ConnectionProducer
@@ -64,8 +67,8 @@ func (c *Cassandra) Type() (string, error) {
return cassandraTypeName, nil
}
func (c *Cassandra) getConnection() (*gocql.Session, error) {
session, err := c.Connection()
func (c *Cassandra) getConnection(ctx context.Context) (*gocql.Session, error) {
session, err := c.Connection(ctx)
if err != nil {
return nil, err
}
@@ -75,13 +78,13 @@ func (c *Cassandra) getConnection() (*gocql.Session, error) {
// CreateUser generates the username/password on the underlying Cassandra secret backend as instructed by
// the CreationStatement provided.
func (c *Cassandra) CreateUser(statements dbplugin.Statements, usernameConfig dbplugin.UsernameConfig, expiration time.Time) (username string, password string, err error) {
func (c *Cassandra) CreateUser(ctx context.Context, statements dbplugin.Statements, usernameConfig dbplugin.UsernameConfig, expiration time.Time) (username string, password string, err error) {
// Grab the lock
c.Lock()
defer c.Unlock()
// Get the connection
session, err := c.getConnection()
session, err := c.getConnection(ctx)
if err != nil {
return "", "", err
}
@@ -138,18 +141,18 @@ func (c *Cassandra) CreateUser(statements dbplugin.Statements, usernameConfig db
}
// RenewUser is not supported on Cassandra, so this is a no-op.
func (c *Cassandra) RenewUser(statements dbplugin.Statements, username string, expiration time.Time) error {
func (c *Cassandra) RenewUser(ctx context.Context, statements dbplugin.Statements, username string, expiration time.Time) error {
// NOOP
return nil
}
// RevokeUser attempts to drop the specified user.
func (c *Cassandra) RevokeUser(statements dbplugin.Statements, username string) error {
func (c *Cassandra) RevokeUser(ctx context.Context, statements dbplugin.Statements, username string) error {
// Grab the lock
c.Lock()
defer c.Unlock()
session, err := c.getConnection()
session, err := c.getConnection(ctx)
if err != nil {
return err
}

View File

@@ -1,6 +1,7 @@
package cassandra
import (
"context"
"os"
"strconv"
"testing"
@@ -89,7 +90,7 @@ func TestCassandra_Initialize(t *testing.T) {
db := dbRaw.(*Cassandra)
connProducer := db.ConnectionProducer.(*cassandraConnectionProducer)
err := db.Initialize(connectionDetails, true)
err := db.Initialize(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -112,7 +113,7 @@ func TestCassandra_Initialize(t *testing.T) {
"protocol_version": "4",
}
err = db.Initialize(connectionDetails, true)
err = db.Initialize(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -135,7 +136,7 @@ func TestCassandra_CreateUser(t *testing.T) {
dbRaw, _ := New()
db := dbRaw.(*Cassandra)
err := db.Initialize(connectionDetails, true)
err := db.Initialize(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -149,7 +150,7 @@ func TestCassandra_CreateUser(t *testing.T) {
RoleName: "test",
}
username, password, err := db.CreateUser(statements, usernameConfig, time.Now().Add(time.Minute))
username, password, err := db.CreateUser(context.Background(), statements, usernameConfig, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -176,7 +177,7 @@ func TestMyCassandra_RenewUser(t *testing.T) {
dbRaw, _ := New()
db := dbRaw.(*Cassandra)
err := db.Initialize(connectionDetails, true)
err := db.Initialize(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -190,7 +191,7 @@ func TestMyCassandra_RenewUser(t *testing.T) {
RoleName: "test",
}
username, password, err := db.CreateUser(statements, usernameConfig, time.Now().Add(time.Minute))
username, password, err := db.CreateUser(context.Background(), statements, usernameConfig, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -199,7 +200,7 @@ func TestMyCassandra_RenewUser(t *testing.T) {
t.Fatalf("Could not connect with new credentials: %s", err)
}
err = db.RenewUser(statements, username, time.Now().Add(time.Minute))
err = db.RenewUser(context.Background(), statements, username, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -222,7 +223,7 @@ func TestCassandra_RevokeUser(t *testing.T) {
dbRaw, _ := New()
db := dbRaw.(*Cassandra)
err := db.Initialize(connectionDetails, true)
err := db.Initialize(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -236,7 +237,7 @@ func TestCassandra_RevokeUser(t *testing.T) {
RoleName: "test",
}
username, password, err := db.CreateUser(statements, usernameConfig, time.Now().Add(time.Minute))
username, password, err := db.CreateUser(context.Background(), statements, usernameConfig, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -246,7 +247,7 @@ func TestCassandra_RevokeUser(t *testing.T) {
}
// Test default revoke statememts
err = db.RevokeUser(statements, username)
err = db.RevokeUser(context.Background(), statements, username)
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@@ -1,6 +1,7 @@
package cassandra
import (
"context"
"crypto/tls"
"fmt"
"strings"
@@ -43,7 +44,7 @@ type cassandraConnectionProducer struct {
sync.Mutex
}
func (c *cassandraConnectionProducer) Initialize(conf map[string]interface{}, verifyConnection bool) error {
func (c *cassandraConnectionProducer) Initialize(ctx context.Context, conf map[string]interface{}, verifyConnection bool) error {
c.Lock()
defer c.Unlock()
@@ -106,7 +107,7 @@ func (c *cassandraConnectionProducer) Initialize(conf map[string]interface{}, ve
c.Initialized = true
if verifyConnection {
if _, err := c.Connection(); err != nil {
if _, err := c.Connection(ctx); err != nil {
return fmt.Errorf("error verifying connection: %s", err)
}
}
@@ -114,7 +115,7 @@ func (c *cassandraConnectionProducer) Initialize(conf map[string]interface{}, ve
return nil
}
func (c *cassandraConnectionProducer) Connection() (interface{}, error) {
func (c *cassandraConnectionProducer) Connection(_ context.Context) (interface{}, error) {
if !c.Initialized {
return nil, connutil.ErrNotInitialized
}