Fix cassandra tests, explicitly set cluster port if provided (#3296)

* Fix cassandra tests, explicitly set cluster port if provided

* Update cassandra.yml test-fixture

* Add port as part of the config option, fix tests

* Remove hostport splitting in cassandraConnectionProducer.createSession

* Include port in API docs
This commit is contained in:
Calvin Leung Huang
2017-09-07 23:04:40 -04:00
committed by GitHub
parent ced217e699
commit 38be34423c
5 changed files with 50 additions and 30 deletions

View File

@@ -64,11 +64,15 @@ func prepareCassandraTestContainer(t *testing.T) (func(), string) {
t.Fatalf("cassandra: failed to connect to docker: %s", err)
}
resource, err := pool.Run("cassandra", "3.11", nil)
resource, err := pool.Run("cassandra", "3.11", []string{"CASSANDRA_BROADCAST_ADDRESS=127.0.0.1"})
if err != nil {
t.Fatalf("cassandra: could not start container: %s", err)
}
cleanup := func() {
pool.Purge(resource)
}
setup := func() error {
cluster := gocql.NewCluster("127.0.0.1")
p, _ := strconv.Atoi(resource.GetPort("9042/tcp"))
@@ -78,6 +82,7 @@ func prepareCassandraTestContainer(t *testing.T) (func(), string) {
if err != nil {
return err
}
defer sess.Close()
// Create keyspace
q := sess.Query(`CREATE KEYSPACE "vault" WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`)
@@ -99,11 +104,9 @@ func prepareCassandraTestContainer(t *testing.T) (func(), string) {
return nil
}
if pool.Retry(setup); err != nil {
cleanup()
t.Fatalf("cassandra: could not setup container: %s", err)
}
cleanup := func() {
pool.Purge(resource)
}
return cleanup, fmt.Sprintf("127.0.0.1:%s", resource.GetPort("9042/tcp"))
}

View File

@@ -13,9 +13,9 @@ import (
dockertest "gopkg.in/ory-am/dockertest.v3"
)
func prepareCassandraTestContainer(t *testing.T) (cleanup func(), retURL string) {
func prepareCassandraTestContainer(t *testing.T) (func(), string, int) {
if os.Getenv("CASSANDRA_HOST") != "" {
return func() {}, os.Getenv("CASSANDRA_HOST")
return func() {}, os.Getenv("CASSANDRA_HOST"), 0
}
pool, err := dockertest.NewPool("")
@@ -29,6 +29,7 @@ func prepareCassandraTestContainer(t *testing.T) (cleanup func(), retURL string)
ro := &dockertest.RunOptions{
Repository: "cassandra",
Tag: "latest",
Env: []string{"CASSANDRA_BROADCAST_ADDRESS=127.0.0.1"},
Mounts: []string{cassandraMountPath},
}
resource, err := pool.RunWithOptions(ro)
@@ -36,19 +37,19 @@ func prepareCassandraTestContainer(t *testing.T) (cleanup func(), retURL string)
t.Fatalf("Could not start local cassandra docker container: %s", err)
}
cleanup = func() {
cleanup := func() {
err := pool.Purge(resource)
if err != nil {
t.Fatalf("Failed to cleanup local container: %s", err)
}
}
retURL = fmt.Sprintf("localhost:%s", resource.GetPort("9042/tcp"))
port, _ := strconv.Atoi(resource.GetPort("9042/tcp"))
address := fmt.Sprintf("127.0.0.1:%d", port)
// exponential backoff-retry
if err = pool.Retry(func() error {
clusterConfig := gocql.NewCluster(retURL)
clusterConfig := gocql.NewCluster(address)
clusterConfig.Authenticator = gocql.PasswordAuthenticator{
Username: "cassandra",
Password: "cassandra",
@@ -63,20 +64,22 @@ func prepareCassandraTestContainer(t *testing.T) (cleanup func(), retURL string)
defer session.Close()
return nil
}); err != nil {
cleanup()
t.Fatalf("Could not connect to cassandra docker container: %s", err)
}
return
return cleanup, address, port
}
func TestCassandra_Initialize(t *testing.T) {
if os.Getenv("TRAVIS") != "true" {
t.SkipNow()
}
cleanup, connURL := prepareCassandraTestContainer(t)
cleanup, address, port := prepareCassandraTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"hosts": connURL,
"hosts": address,
"port": port,
"username": "cassandra",
"password": "cassandra",
"protocol_version": 4,
@@ -102,7 +105,8 @@ func TestCassandra_Initialize(t *testing.T) {
// test a string protocol
connectionDetails = map[string]interface{}{
"hosts": connURL,
"hosts": address,
"port": strconv.Itoa(port),
"username": "cassandra",
"password": "cassandra",
"protocol_version": "4",
@@ -118,11 +122,12 @@ func TestCassandra_CreateUser(t *testing.T) {
if os.Getenv("TRAVIS") != "true" {
t.SkipNow()
}
cleanup, connURL := prepareCassandraTestContainer(t)
cleanup, address, port := prepareCassandraTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"hosts": connURL,
"hosts": address,
"port": port,
"username": "cassandra",
"password": "cassandra",
"protocol_version": 4,
@@ -149,7 +154,7 @@ func TestCassandra_CreateUser(t *testing.T) {
t.Fatalf("err: %s", err)
}
if err := testCredsExist(t, connURL, username, password); err != nil {
if err := testCredsExist(t, address, port, username, password); err != nil {
t.Fatalf("Could not connect with new credentials: %s", err)
}
}
@@ -158,11 +163,12 @@ func TestMyCassandra_RenewUser(t *testing.T) {
if os.Getenv("TRAVIS") != "true" {
t.SkipNow()
}
cleanup, connURL := prepareCassandraTestContainer(t)
cleanup, address, port := prepareCassandraTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"hosts": connURL,
"hosts": address,
"port": port,
"username": "cassandra",
"password": "cassandra",
"protocol_version": 4,
@@ -189,7 +195,7 @@ func TestMyCassandra_RenewUser(t *testing.T) {
t.Fatalf("err: %s", err)
}
if err := testCredsExist(t, connURL, username, password); err != nil {
if err := testCredsExist(t, address, port, username, password); err != nil {
t.Fatalf("Could not connect with new credentials: %s", err)
}
@@ -203,11 +209,12 @@ func TestCassandra_RevokeUser(t *testing.T) {
if os.Getenv("TRAVIS") != "true" {
t.SkipNow()
}
cleanup, connURL := prepareCassandraTestContainer(t)
cleanup, address, port := prepareCassandraTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"hosts": connURL,
"hosts": address,
"port": port,
"username": "cassandra",
"password": "cassandra",
"protocol_version": 4,
@@ -234,7 +241,7 @@ func TestCassandra_RevokeUser(t *testing.T) {
t.Fatalf("err: %s", err)
}
if err = testCredsExist(t, connURL, username, password); err != nil {
if err = testCredsExist(t, address, port, username, password); err != nil {
t.Fatalf("Could not connect with new credentials: %s", err)
}
@@ -244,18 +251,19 @@ func TestCassandra_RevokeUser(t *testing.T) {
t.Fatalf("err: %s", err)
}
if err = testCredsExist(t, connURL, username, password); err == nil {
if err = testCredsExist(t, address, port, username, password); err == nil {
t.Fatal("Credentials were not revoked")
}
}
func testCredsExist(t testing.TB, connURL, username, password string) error {
clusterConfig := gocql.NewCluster(connURL)
func testCredsExist(t testing.TB, address string, port int, username, password string) error {
clusterConfig := gocql.NewCluster(address)
clusterConfig.Authenticator = gocql.PasswordAuthenticator{
Username: username,
Password: password,
}
clusterConfig.ProtoVersion = 4
clusterConfig.Port = port
session, err := clusterConfig.CreateSession()
if err != nil {

View File

@@ -20,6 +20,7 @@ import (
// interface for cassandra databases to make connections.
type cassandraConnectionProducer struct {
Hosts string `json:"hosts" structs:"hosts" mapstructure:"hosts"`
Port int `json:"port" structs:"port" mapstructure:"port"`
Username string `json:"username" structs:"username" mapstructure:"username"`
Password string `json:"password" structs:"password" mapstructure:"password"`
TLS bool `json:"tls" structs:"tls" mapstructure:"tls"`
@@ -149,12 +150,17 @@ func (c *cassandraConnectionProducer) Close() error {
}
func (c *cassandraConnectionProducer) createSession() (*gocql.Session, error) {
clusterConfig := gocql.NewCluster(strings.Split(c.Hosts, ",")...)
hosts := strings.Split(c.Hosts, ",")
clusterConfig := gocql.NewCluster(hosts...)
clusterConfig.Authenticator = gocql.PasswordAuthenticator{
Username: c.Username,
Password: c.Password,
}
if c.Port != 0 {
clusterConfig.Port = c.Port
}
clusterConfig.ProtoVersion = c.ProtocolVersion
if clusterConfig.ProtoVersion == 0 {
clusterConfig.ProtoVersion = 2

View File

@@ -421,7 +421,7 @@ seed_provider:
parameters:
# seeds is actually a comma-delimited list of addresses.
# Ex: "<ip1>,<ip2>,<ip3>"
- seeds: "172.17.0.4"
- seeds: "127.0.0.1"
# For workloads with more data than can fit in memory, Cassandra's
# bottleneck will be reads that need to fetch data from
@@ -572,7 +572,7 @@ ssl_storage_port: 7001
#
# Setting listen_address to 0.0.0.0 is always wrong.
#
listen_address: 172.17.0.4
listen_address: 172.17.0.5
# Set listen_address OR listen_interface, not both. Interfaces must correspond
# to a single address, IP aliasing is not supported.
@@ -586,7 +586,7 @@ listen_address: 172.17.0.4
# Address to broadcast to other Cassandra nodes
# Leaving this blank will set it to the same value as listen_address
broadcast_address: 172.17.0.4
broadcast_address: 127.0.0.1
# When using multiple physical network interfaces, set this
# to true to listen on broadcast_address in addition to
@@ -668,7 +668,7 @@ rpc_port: 9160
# be set to 0.0.0.0. If left blank, this will be set to the value of
# rpc_address. If rpc_address is set to 0.0.0.0, broadcast_rpc_address must
# be set.
broadcast_rpc_address: 172.17.0.4
broadcast_rpc_address: 127.0.0.1
# enable or disable keepalive on rpc/native connections
rpc_keepalive: true

View File

@@ -26,6 +26,9 @@ has a number of parameters to further configure a connection.
- `hosts` `(string: <required>)` Specifies a set of comma-delineated Cassandra
hosts to connect to.
- `port` `(int: 9042)` Specifies the default port to use if none is provided
as part of the host URI. Defaults to Cassandra's default transport port, 9042.
- `username` `(string: <required>)` Specifies the username to use for
superuser access.