Support MongoDB session-wide write concern (#3646)

* Initial work on write concern support, set for the lifetime of the session

* Add base64 encoded value support, include docs and tests

* Handle error from json.Unmarshal, fix test and docs

* Remove writeConcern struct, move JSON unmarshal to Initialize

* Return error on empty mapping of write_concern into mgo.Safe struct
This commit is contained in:
Calvin Leung Huang
2017-12-05 15:31:01 -05:00
committed by GitHub
parent 208dc55830
commit a9e7dbb7b4
3 changed files with 89 additions and 5 deletions

View File

@@ -2,6 +2,8 @@ package mongodb
import (
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net"
@@ -21,10 +23,12 @@ import (
// interface for databases to make connections.
type mongoDBConnectionProducer struct {
ConnectionURL string `json:"connection_url" structs:"connection_url" mapstructure:"connection_url"`
WriteConcern string `json:"write_concern" structs:"write_concern" mapstructure:"write_concern"`
Initialized bool
Type string
session *mgo.Session
safe *mgo.Safe
sync.Mutex
}
@@ -42,6 +46,30 @@ func (c *mongoDBConnectionProducer) Initialize(conf map[string]interface{}, veri
return fmt.Errorf("connection_url cannot be empty")
}
if c.WriteConcern != "" {
input := c.WriteConcern
// Try to base64 decode the input. If successful, consider the decoded
// value as input.
inputBytes, err := base64.StdEncoding.DecodeString(input)
if err == nil {
input = string(inputBytes)
}
concern := &mgo.Safe{}
err = json.Unmarshal([]byte(input), concern)
if err != nil {
return fmt.Errorf("error mashalling write_concern: %s", err)
}
// Guard against empty, non-nil mgo.Safe object; we don't want to pass that
// into mgo.SetSafe in Connection().
if (mgo.Safe{} == *concern) {
return fmt.Errorf("provided write_concern values did not map to any mgo.Safe fields")
}
c.safe = concern
}
// Set initialized to true at this point since all fields are set,
// and the connection can be established at a later time.
c.Initialized = true
@@ -78,6 +106,11 @@ func (c *mongoDBConnectionProducer) Connection() (interface{}, error) {
if err != nil {
return nil, err
}
if c.safe != nil {
c.session.SetSafe(c.safe)
}
c.session.SetSyncTimeout(1 * time.Minute)
c.session.SetSocketTimeout(1 * time.Minute)

View File

@@ -16,6 +16,8 @@ import (
const testMongoDBRole = `{ "db": "admin", "roles": [ { "role": "readWrite" } ] }`
const testMongoDBWriteConcern = `{ "wmode": "majority", "wtimeout": 5000 }`
func prepareMongoDBTestContainer(t *testing.T) (cleanup func(), retURL string) {
if os.Getenv("MONGODB_URL") != "" {
return func() {}, os.Getenv("MONGODB_URL")
@@ -129,6 +131,44 @@ func TestMongoDB_CreateUser(t *testing.T) {
}
}
func TestMongoDB_CreateUser_writeConcern(t *testing.T) {
cleanup, connURL := prepareMongoDBTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"connection_url": connURL,
"write_concern": testMongoDBWriteConcern,
}
dbRaw, err := New()
if err != nil {
t.Fatalf("err: %s", err)
}
db := dbRaw.(*MongoDB)
err = db.Initialize(connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
statements := dbplugin.Statements{
CreationStatements: testMongoDBRole,
}
usernameConfig := dbplugin.UsernameConfig{
DisplayName: "test",
RoleName: "test",
}
username, password, err := db.CreateUser(statements, usernameConfig, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
if err := testCredsExist(t, connURL, username, password); err != nil {
t.Fatalf("Could not connect with new credentials: %s", err)
}
}
func TestMongoDB_RevokeUser(t *testing.T) {
cleanup, connURL := prepareMongoDBTestContainer(t)
defer cleanup()