Influxdb secret engine built-in plugin (#5924)

* intial work for influxdb secret plugin

* fixed typo

* added comment

* added documentation

* added tests

* fixed tests

* added vendoring

* minor testing issue with hardcoded values

* minor fixes
This commit is contained in:
Giacomo Tirabassi
2019-01-09 02:26:16 +01:00
committed by Brian Kassouf
parent 8017af2504
commit 726aa02038
32 changed files with 7502 additions and 0 deletions

View File

@@ -23,6 +23,7 @@ import (
dbCass "github.com/hashicorp/vault/plugins/database/cassandra"
dbHana "github.com/hashicorp/vault/plugins/database/hana"
dbInflux "github.com/hashicorp/vault/plugins/database/influxdb"
dbMongo "github.com/hashicorp/vault/plugins/database/mongodb"
dbMssql "github.com/hashicorp/vault/plugins/database/mssql"
dbMysql "github.com/hashicorp/vault/plugins/database/mysql"
@@ -89,6 +90,7 @@ func newRegistry() *registry {
"cassandra-database-plugin": dbCass.New,
"mongodb-database-plugin": dbMongo.New,
"hana-database-plugin": dbHana.New,
"influxdb-database-plugin": dbInflux.New,
},
logicalBackends: map[string]logical.Factory{
"ad": logicalAd.Factory,

View File

@@ -0,0 +1,263 @@
package influxdb
import (
"context"
"crypto/tls"
"fmt"
"sync"
"time"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/vault/helper/certutil"
"github.com/hashicorp/vault/helper/parseutil"
"github.com/hashicorp/vault/helper/tlsutil"
"github.com/hashicorp/vault/plugins/helper/database/connutil"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/mitchellh/mapstructure"
)
// influxdbConnectionProducer implements ConnectionProducer and provides an
// interface for influxdb databases to make connections.
type influxdbConnectionProducer struct {
Host string `json:"host" structs:"host" mapstructure:"host"`
Username string `json:"username" structs:"username" mapstructure:"username"`
Password string `json:"password" structs:"password" mapstructure:"password"`
Port string `json:"port" structs:"port" mapstructure:"port"` //default to 8086
TLS bool `json:"tls" structs:"tls" mapstructure:"tls"`
InsecureTLS bool `json:"insecure_tls" structs:"insecure_tls" mapstructure:"insecure_tls"`
ConnectTimeoutRaw interface{} `json:"connect_timeout" structs:"connect_timeout" mapstructure:"connect_timeout"`
TLSMinVersion string `json:"tls_min_version" structs:"tls_min_version" mapstructure:"tls_min_version"`
PemBundle string `json:"pem_bundle" structs:"pem_bundle" mapstructure:"pem_bundle"`
PemJSON string `json:"pem_json" structs:"pem_json" mapstructure:"pem_json"`
connectTimeout time.Duration
certificate string
privateKey string
issuingCA string
rawConfig map[string]interface{}
Initialized bool
Type string
client influx.Client
sync.Mutex
}
func (i *influxdbConnectionProducer) Initialize(ctx context.Context, conf map[string]interface{}, verifyConnection bool) error {
_, err := i.Init(ctx, conf, verifyConnection)
return err
}
func (i *influxdbConnectionProducer) Init(ctx context.Context, conf map[string]interface{}, verifyConnection bool) (map[string]interface{}, error) {
i.Lock()
defer i.Unlock()
i.rawConfig = conf
err := mapstructure.WeakDecode(conf, i)
if err != nil {
return nil, err
}
if i.ConnectTimeoutRaw == nil {
i.ConnectTimeoutRaw = "0s"
}
if i.Port == "" {
i.Port = "8086"
}
i.connectTimeout, err = parseutil.ParseDurationSecond(i.ConnectTimeoutRaw)
if err != nil {
return nil, errwrap.Wrapf("invalid connect_timeout: {{err}}", err)
}
switch {
case len(i.Host) == 0:
return nil, fmt.Errorf("host cannot be empty")
case len(i.Username) == 0:
return nil, fmt.Errorf("username cannot be empty")
case len(i.Password) == 0:
return nil, fmt.Errorf("password cannot be empty")
}
var certBundle *certutil.CertBundle
var parsedCertBundle *certutil.ParsedCertBundle
switch {
case len(i.PemJSON) != 0:
parsedCertBundle, err = certutil.ParsePKIJSON([]byte(i.PemJSON))
if err != nil {
return nil, errwrap.Wrapf("could not parse given JSON; it must be in the format of the output of the PKI backend certificate issuing command: {{err}}", err)
}
certBundle, err = parsedCertBundle.ToCertBundle()
if err != nil {
return nil, errwrap.Wrapf("Error marshaling PEM information: {{err}}", err)
}
i.certificate = certBundle.Certificate
i.privateKey = certBundle.PrivateKey
i.issuingCA = certBundle.IssuingCA
i.TLS = true
case len(i.PemBundle) != 0:
parsedCertBundle, err = certutil.ParsePEMBundle(i.PemBundle)
if err != nil {
return nil, errwrap.Wrapf("Error parsing the given PEM information: {{err}}", err)
}
certBundle, err = parsedCertBundle.ToCertBundle()
if err != nil {
return nil, errwrap.Wrapf("Error marshaling PEM information: {{err}}", err)
}
i.certificate = certBundle.Certificate
i.privateKey = certBundle.PrivateKey
i.issuingCA = certBundle.IssuingCA
i.TLS = true
}
// Set initialized to true at this point since all fields are set,
// and the connection can be established at a later time.
i.Initialized = true
if verifyConnection {
if _, err := i.Connection(ctx); err != nil {
return nil, errwrap.Wrapf("error verifying connection: {{err}}", err)
}
}
return conf, nil
}
func (i *influxdbConnectionProducer) Connection(_ context.Context) (interface{}, error) {
if !i.Initialized {
return nil, connutil.ErrNotInitialized
}
// If we already have a DB, return it
if i.client != nil {
return i.client, nil
}
cli, err := i.createClient()
if err != nil {
return nil, err
}
// Store the session in backend for reuse
i.client = cli
return cli, nil
}
func (i *influxdbConnectionProducer) Close() error {
// Grab the write lock
i.Lock()
defer i.Unlock()
if i.client != nil {
i.client.Close()
}
i.client = nil
return nil
}
func (i *influxdbConnectionProducer) createClient() (influx.Client, error) {
clientConfig := influx.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", i.Host, i.Port),
Username: i.Username,
Password: i.Password,
UserAgent: "vault-influxdb-plugin",
Timeout: i.connectTimeout,
}
if i.TLS {
var tlsConfig *tls.Config
if len(i.certificate) > 0 || len(i.issuingCA) > 0 {
if len(i.certificate) > 0 && len(i.privateKey) == 0 {
return nil, fmt.Errorf("found certificate for TLS authentication but no private key")
}
certBundle := &certutil.CertBundle{}
if len(i.certificate) > 0 {
certBundle.Certificate = i.certificate
certBundle.PrivateKey = i.privateKey
}
if len(i.issuingCA) > 0 {
certBundle.IssuingCA = i.issuingCA
}
parsedCertBundle, err := certBundle.ToParsedCertBundle()
if err != nil {
return nil, errwrap.Wrapf("failed to parse certificate bundle: {{err}}", err)
}
tlsConfig, err = parsedCertBundle.GetTLSConfig(certutil.TLSClient)
if err != nil || tlsConfig == nil {
return nil, errwrap.Wrapf(fmt.Sprintf("failed to get TLS configuration: tlsConfig:%#v err:{{err}}", tlsConfig), err)
}
tlsConfig.InsecureSkipVerify = i.InsecureTLS
if i.TLSMinVersion != "" {
var ok bool
tlsConfig.MinVersion, ok = tlsutil.TLSLookup[i.TLSMinVersion]
if !ok {
return nil, fmt.Errorf("invalid 'tls_min_version' in config")
}
} else {
// MinVersion was not being set earlier. Reset it to
// zero to gracefully handle upgrades.
tlsConfig.MinVersion = 0
}
}
clientConfig.TLSConfig = tlsConfig
clientConfig.Addr = fmt.Sprintf("https://%s:%s", i.Host, i.Port)
}
cli, err := influx.NewHTTPClient(clientConfig)
if err != nil {
return nil, errwrap.Wrapf("error creating client: {{err}}", err)
}
// Checking server status
_, _, err = cli.Ping(i.connectTimeout)
if err != nil {
return nil, errwrap.Wrapf("error checking cluster status: {{err}}", err)
}
// verifying infos about the connection
isAdmin, err := isUserAdmin(cli, i.Username)
if err != nil {
return nil, errwrap.Wrapf("error getting if provided username is admin: {{err}}", err)
}
if !isAdmin {
return nil, fmt.Errorf("the provided user is not an admin of the influxDB server")
}
return cli, nil
}
func (i *influxdbConnectionProducer) secretValues() map[string]interface{} {
return map[string]interface{}{
i.Password: "[password]",
i.PemBundle: "[pem_bundle]",
i.PemJSON: "[pem_json]",
}
}
func isUserAdmin(cli influx.Client, user string) (bool, error) {
q := influx.NewQuery("SHOW USERS", "", "")
response, err := cli.Query(q)
if err != nil {
return false, err
}
if response.Error() != nil {
return false, response.Error()
}
for _, res := range response.Results {
for _, serie := range res.Series {
for _, val := range serie.Values {
if val[0].(string) == user && val[1].(bool) == true {
return true, nil
}
}
}
}
return false, fmt.Errorf("the provided username is not a valid user in the influxdb")
}

View File

@@ -0,0 +1,21 @@
package main
import (
"log"
"os"
"github.com/hashicorp/vault/helper/pluginutil"
"github.com/hashicorp/vault/plugins/database/influxdb"
)
func main() {
apiClientMeta := &pluginutil.APIClientMeta{}
flags := apiClientMeta.FlagSet()
flags.Parse(os.Args[1:])
err := influxdb.Run(apiClientMeta.GetTLSConfig())
if err != nil {
log.Println(err)
os.Exit(1)
}
}

View File

@@ -0,0 +1,246 @@
package influxdb
import (
"context"
"strings"
"time"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/builtin/logical/database/dbplugin"
"github.com/hashicorp/vault/helper/strutil"
"github.com/hashicorp/vault/plugins"
"github.com/hashicorp/vault/plugins/helper/database/credsutil"
"github.com/hashicorp/vault/plugins/helper/database/dbutil"
influx "github.com/influxdata/influxdb/client/v2"
)
const (
defaultUserCreationIFQL = `CREATE USER "{{username}}" WITH PASSWORD '{{password}}';`
defaultUserDeletionIFQL = `DROP USER "{{username}}";`
defaultRootCredentialRotationIFQL = `SET PASSWORD FOR "{{username}}" = '{{password}}';`
influxdbTypeName = "influxdb"
)
var _ dbplugin.Database = &Influxdb{}
// Influxdb is an implementation of Database interface
type Influxdb struct {
*influxdbConnectionProducer
credsutil.CredentialsProducer
}
// New returns a new Cassandra instance
func New() (interface{}, error) {
db := new()
dbType := dbplugin.NewDatabaseErrorSanitizerMiddleware(db, db.secretValues)
return dbType, nil
}
func new() *Influxdb {
connProducer := &influxdbConnectionProducer{}
connProducer.Type = influxdbTypeName
credsProducer := &credsutil.SQLCredentialsProducer{
DisplayNameLen: 15,
RoleNameLen: 15,
UsernameLen: 100,
Separator: "_",
}
return &Influxdb{
influxdbConnectionProducer: connProducer,
CredentialsProducer: credsProducer,
}
}
// Run instantiates a Influxdb object, and runs the RPC server for the plugin
func Run(apiTLSConfig *api.TLSConfig) error {
dbType, err := New()
if err != nil {
return err
}
plugins.Serve(dbType.(dbplugin.Database), apiTLSConfig)
return nil
}
// Type returns the TypeName for this backend
func (i *Influxdb) Type() (string, error) {
return influxdbTypeName, nil
}
func (i *Influxdb) getConnection(ctx context.Context) (influx.Client, error) {
cli, err := i.Connection(ctx)
if err != nil {
return nil, err
}
return cli.(influx.Client), nil
}
// CreateUser generates the username/password on the underlying Influxdb secret backend as instructed by
// the CreationStatement provided.
func (i *Influxdb) CreateUser(ctx context.Context, statements dbplugin.Statements, usernameConfig dbplugin.UsernameConfig, expiration time.Time) (username string, password string, err error) {
// Grab the lock
i.Lock()
defer i.Unlock()
statements = dbutil.StatementCompatibilityHelper(statements)
// Get the connection
cli, err := i.getConnection(ctx)
if err != nil {
return "", "", err
}
creationIFQL := statements.Creation
if len(creationIFQL) == 0 {
creationIFQL = []string{defaultUserCreationIFQL}
}
rollbackIFQL := statements.Rollback
if len(rollbackIFQL) == 0 {
rollbackIFQL = []string{defaultUserDeletionIFQL}
}
username, err = i.GenerateUsername(usernameConfig)
username = strings.Replace(username, "-", "_", -1)
if err != nil {
return "", "", err
}
username = strings.ToLower(username)
password, err = i.GeneratePassword()
if err != nil {
return "", "", err
}
// Execute each query
for _, stmt := range creationIFQL {
for _, query := range strutil.ParseArbitraryStringSlice(stmt, ";") {
query = strings.TrimSpace(query)
if len(query) == 0 {
continue
}
q := influx.NewQuery(dbutil.QueryHelper(query, map[string]string{
"username": username,
"password": password,
}), "", "")
response, err := cli.Query(q)
if err != nil && response.Error() != nil {
for _, stmt := range rollbackIFQL {
for _, query := range strutil.ParseArbitraryStringSlice(stmt, ";") {
query = strings.TrimSpace(query)
if len(query) == 0 {
continue
}
q := influx.NewQuery(dbutil.QueryHelper(query, map[string]string{
"username": username,
}), "", "")
response, err := cli.Query(q)
if err != nil && response.Error() != nil {
return "", "", err
}
}
}
return "", "", err
}
}
}
return username, password, nil
}
// RenewUser is not supported on Influxdb, so this is a no-op.
// TO CHECK THIS FIRST
func (i *Influxdb) RenewUser(ctx context.Context, statements dbplugin.Statements, username string, expiration time.Time) error {
// NOOP
return nil
}
// RevokeUser attempts to drop the specified user.
func (i *Influxdb) RevokeUser(ctx context.Context, statements dbplugin.Statements, username string) error {
// Grab the lock
i.Lock()
defer i.Unlock()
statements = dbutil.StatementCompatibilityHelper(statements)
cli, err := i.getConnection(ctx)
if err != nil {
return err
}
revocationIFQL := statements.Revocation
if len(revocationIFQL) == 0 {
revocationIFQL = []string{defaultUserDeletionIFQL}
}
var result *multierror.Error
for _, stmt := range revocationIFQL {
for _, query := range strutil.ParseArbitraryStringSlice(stmt, ";") {
query = strings.TrimSpace(query)
if len(query) == 0 {
continue
}
q := influx.NewQuery(dbutil.QueryHelper(query, map[string]string{
"username": username,
}), "", "")
response, err := cli.Query(q)
result = multierror.Append(result, err)
result = multierror.Append(result, response.Error())
}
}
return result.ErrorOrNil()
}
// RotateRootCredentials is useful when we try to change root credential
func (i *Influxdb) RotateRootCredentials(ctx context.Context, statements []string) (map[string]interface{}, error) {
// Grab the lock
i.Lock()
defer i.Unlock()
cli, err := i.getConnection(ctx)
if err != nil {
return nil, err
}
rotateIFQL := statements
if len(rotateIFQL) == 0 {
rotateIFQL = []string{defaultRootCredentialRotationIFQL}
}
password, err := i.GeneratePassword()
if err != nil {
return nil, err
}
var result *multierror.Error
for _, stmt := range rotateIFQL {
for _, query := range strutil.ParseArbitraryStringSlice(stmt, ";") {
query = strings.TrimSpace(query)
if len(query) == 0 {
continue
}
q := influx.NewQuery(dbutil.QueryHelper(query, map[string]string{
"username": i.Username,
"password": password,
}), "", "")
response, err := cli.Query(q)
result = multierror.Append(result, err)
result = multierror.Append(result, response.Error())
}
}
err = result.ErrorOrNil()
if err != nil {
return nil, err
}
i.rawConfig["password"] = password
return i.rawConfig, nil
}

View File

@@ -0,0 +1,310 @@
package influxdb
import (
"context"
"fmt"
"os"
"strconv"
"testing"
"time"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/vault/builtin/logical/database/dbplugin"
influx "github.com/influxdata/influxdb/client/v2"
"github.com/ory/dockertest"
)
const testInfluxRole = `CREATE USER "{{username}}" WITH PASSWORD '{{password}}';GRANT ALL ON "vault" TO "{{username}}";`
func prepareInfluxdbTestContainer(t *testing.T) (func(), string, int) {
if os.Getenv("INFLUXDB_HOST") != "" {
return func() {}, os.Getenv("INFLUXDB_HOST"), 0
}
pool, err := dockertest.NewPool("")
if err != nil {
t.Fatalf("Failed to connect to docker: %s", err)
}
ro := &dockertest.RunOptions{
Repository: "influxdb",
Tag: "alpine",
Env: []string{"INFLUXDB_DB=vault", "INFLUXDB_ADMIN_USER=influx-root", "INFLUXDB_ADMIN_PASSWORD=influx-root", "INFLUXDB_HTTP_AUTH_ENABLED=true"},
}
resource, err := pool.RunWithOptions(ro)
if err != nil {
t.Fatalf("Could not start local influxdb docker container: %s", err)
}
cleanup := func() {
err := pool.Purge(resource)
if err != nil {
t.Fatalf("Failed to cleanup local container: %s", err)
}
}
port, _ := strconv.Atoi(resource.GetPort("8086/tcp"))
address := "127.0.0.1"
// exponential backoff-retry
if err = pool.Retry(func() error {
cli, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%d", address, port),
Username: "influx-root",
Password: "influx-root",
})
if err != nil {
return errwrap.Wrapf("Error creating InfluxDB Client: ", err)
}
defer cli.Close()
_, _, err = cli.Ping(1)
if err != nil {
return errwrap.Wrapf("error checking cluster status: {{err}}", err)
}
return nil
}); err != nil {
cleanup()
t.Fatalf("Could not connect to influxdb docker container: %s", err)
}
return cleanup, address, port
}
func TestInfluxdb_Initialize(t *testing.T) {
if os.Getenv("VAULT_ACC") == "" {
t.SkipNow()
}
cleanup, address, port := prepareInfluxdbTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"host": address,
"port": port,
"username": "influx-root",
"password": "influx-root",
}
db := new()
_, err := db.Init(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
if !db.Initialized {
t.Fatal("Database should be initalized")
}
err = db.Close()
if err != nil {
t.Fatalf("err: %s", err)
}
// test a string protocol
connectionDetails = map[string]interface{}{
"host": address,
"port": strconv.Itoa(port),
"username": "influx-root",
"password": "influx-root",
}
_, err = db.Init(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
}
func TestInfluxdb_CreateUser(t *testing.T) {
if os.Getenv("VAULT_ACC") == "" {
t.SkipNow()
}
cleanup, address, port := prepareInfluxdbTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"host": address,
"port": port,
"username": "influx-root",
"password": "influx-root",
}
db := new()
_, err := db.Init(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
statements := dbplugin.Statements{
Creation: []string{testInfluxRole},
}
usernameConfig := dbplugin.UsernameConfig{
DisplayName: "test",
RoleName: "test",
}
username, password, err := db.CreateUser(context.Background(), statements, usernameConfig, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
if err := testCredsExist(t, address, port, username, password); err != nil {
t.Fatalf("Could not connect with new credentials: %s", err)
}
}
func TestMyInfluxdb_RenewUser(t *testing.T) {
if os.Getenv("VAULT_ACC") == "" {
t.SkipNow()
}
cleanup, address, port := prepareInfluxdbTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"host": address,
"port": port,
"username": "influx-root",
"password": "influx-root",
}
db := new()
_, err := db.Init(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
statements := dbplugin.Statements{
Creation: []string{testInfluxRole},
}
usernameConfig := dbplugin.UsernameConfig{
DisplayName: "test",
RoleName: "test",
}
username, password, err := db.CreateUser(context.Background(), statements, usernameConfig, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
if err := testCredsExist(t, address, port, username, password); err != nil {
t.Fatalf("Could not connect with new credentials: %s", err)
}
err = db.RenewUser(context.Background(), statements, username, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
}
func TestInfluxdb_RevokeUser(t *testing.T) {
if os.Getenv("VAULT_ACC") == "" {
t.SkipNow()
}
cleanup, address, port := prepareInfluxdbTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"host": address,
"port": port,
"username": "influx-root",
"password": "influx-root",
}
db := new()
_, err := db.Init(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
statements := dbplugin.Statements{
Creation: []string{testInfluxRole},
}
usernameConfig := dbplugin.UsernameConfig{
DisplayName: "test",
RoleName: "test",
}
username, password, err := db.CreateUser(context.Background(), statements, usernameConfig, time.Now().Add(time.Minute))
if err != nil {
t.Fatalf("err: %s", err)
}
if err = testCredsExist(t, address, port, username, password); err != nil {
t.Fatalf("Could not connect with new credentials: %s", err)
}
// Test default revoke statements
err = db.RevokeUser(context.Background(), statements, username)
if err != nil {
t.Fatalf("err: %s", err)
}
if err = testCredsExist(t, address, port, username, password); err == nil {
t.Fatal("Credentials were not revoked")
}
}
func TestInfluxdb_RotateRootCredentials(t *testing.T) {
if os.Getenv("VAULT_ACC") == "" {
t.SkipNow()
}
cleanup, address, port := prepareInfluxdbTestContainer(t)
defer cleanup()
connectionDetails := map[string]interface{}{
"host": address,
"port": port,
"username": "influx-root",
"password": "influx-root",
}
db := new()
connProducer := db.influxdbConnectionProducer
_, err := db.Init(context.Background(), connectionDetails, true)
if err != nil {
t.Fatalf("err: %s", err)
}
if !connProducer.Initialized {
t.Fatal("Database should be initialized")
}
newConf, err := db.RotateRootCredentials(context.Background(), nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if newConf["password"] == "influx-root" {
t.Fatal("password was not updated")
}
err = db.Close()
if err != nil {
t.Fatalf("err: %s", err)
}
}
func testCredsExist(t testing.TB, address string, port int, username, password string) error {
cli, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%d", address, port),
Username: username,
Password: password,
})
if err != nil {
return errwrap.Wrapf("Error creating InfluxDB Client: ", err)
}
defer cli.Close()
_, _, err = cli.Ping(1)
if err != nil {
return errwrap.Wrapf("error checking server ping: {{err}}", err)
}
q := influx.NewQuery("SHOW SERIES ON vault", "", "")
response, err := cli.Query(q)
if err != nil {
return errwrap.Wrapf("error querying influxdb server: {{err}}", err)
}
if response.Error() != nil {
return errwrap.Wrapf("error using the correct influx database: {{err}}", response.Error())
}
return nil
}

View File

@@ -1549,6 +1549,7 @@ func (m *mockBuiltinRegistry) Keys(pluginType consts.PluginType) []string {
"cassandra-database-plugin",
"mongodb-database-plugin",
"hana-database-plugin",
"influxdb-database-plugin",
}
}

20
vendor/github.com/influxdata/influxdb/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2013-2018 InfluxData Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,722 @@
// Package client (v2) is the current official Go client for InfluxDB.
package client // import "github.com/influxdata/influxdb/client/v2"
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"mime"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/models"
)
// HTTPConfig is the config data needed to create an HTTP Client.
type HTTPConfig struct {
// Addr should be of the form "http://host:port"
// or "http://[ipv6-host%zone]:port".
Addr string
// Username is the influxdb username, optional.
Username string
// Password is the influxdb password, optional.
Password string
// UserAgent is the http User Agent, defaults to "InfluxDBClient".
UserAgent string
// Timeout for influxdb writes, defaults to no timeout.
Timeout time.Duration
// InsecureSkipVerify gets passed to the http client, if true, it will
// skip https certificate verification. Defaults to false.
InsecureSkipVerify bool
// TLSConfig allows the user to set their own TLS config for the HTTP
// Client. If set, this option overrides InsecureSkipVerify.
TLSConfig *tls.Config
// Proxy configures the Proxy function on the HTTP client.
Proxy func(req *http.Request) (*url.URL, error)
}
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
type BatchPointsConfig struct {
// Precision is the write precision of the points, defaults to "ns".
Precision string
// Database is the database to write points to.
Database string
// RetentionPolicy is the retention policy of the points.
RetentionPolicy string
// Write consistency is the number of servers required to confirm write.
WriteConsistency string
}
// Client is a client interface for writing & querying the database.
type Client interface {
// Ping checks that status of cluster, and will always return 0 time and no
// error for UDP clients.
Ping(timeout time.Duration) (time.Duration, string, error)
// Write takes a BatchPoints object and writes all Points to InfluxDB.
Write(bp BatchPoints) error
// Query makes an InfluxDB Query on the database. This will fail if using
// the UDP client.
Query(q Query) (*Response, error)
// QueryAsChunk makes an InfluxDB Query on the database. This will fail if using
// the UDP client.
QueryAsChunk(q Query) (*ChunkedResponse, error)
// Close releases any resources a Client may be using.
Close() error
}
// NewHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func NewHTTPClient(conf HTTPConfig) (Client, error) {
if conf.UserAgent == "" {
conf.UserAgent = "InfluxDBClient"
}
u, err := url.Parse(conf.Addr)
if err != nil {
return nil, err
} else if u.Scheme != "http" && u.Scheme != "https" {
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
" must start with http:// or https://", u.Scheme)
return nil, errors.New(m)
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
Proxy: conf.Proxy,
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
}
return &client{
url: *u,
username: conf.Username,
password: conf.Password,
useragent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
},
transport: tr,
}, nil
}
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
now := time.Now()
u := c.url
u.Path = path.Join(u.Path, "ping")
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return 0, "", err
}
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
if timeout > 0 {
params := req.URL.Query()
params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
req.URL.RawQuery = params.Encode()
}
resp, err := c.httpClient.Do(req)
if err != nil {
return 0, "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, "", err
}
if resp.StatusCode != http.StatusNoContent {
var err = errors.New(string(body))
return 0, "", err
}
version := resp.Header.Get("X-Influxdb-Version")
return time.Since(now), version, nil
}
// Close releases the client's resources.
func (c *client) Close() error {
c.transport.CloseIdleConnections()
return nil
}
// client is safe for concurrent use as the fields are all read-only
// once the client is instantiated.
type client struct {
// N.B - if url.UserInfo is accessed in future modifications to the
// methods on client, you will need to synchronize access to url.
url url.URL
username string
password string
useragent string
httpClient *http.Client
transport *http.Transport
}
// BatchPoints is an interface into a batched grouping of points to write into
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
// batch for each goroutine.
type BatchPoints interface {
// AddPoint adds the given point to the Batch of points.
AddPoint(p *Point)
// AddPoints adds the given points to the Batch of points.
AddPoints(ps []*Point)
// Points lists the points in the Batch.
Points() []*Point
// Precision returns the currently set precision of this Batch.
Precision() string
// SetPrecision sets the precision of this batch.
SetPrecision(s string) error
// Database returns the currently set database of this Batch.
Database() string
// SetDatabase sets the database of this Batch.
SetDatabase(s string)
// WriteConsistency returns the currently set write consistency of this Batch.
WriteConsistency() string
// SetWriteConsistency sets the write consistency of this Batch.
SetWriteConsistency(s string)
// RetentionPolicy returns the currently set retention policy of this Batch.
RetentionPolicy() string
// SetRetentionPolicy sets the retention policy of this Batch.
SetRetentionPolicy(s string)
}
// NewBatchPoints returns a BatchPoints interface based on the given config.
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
if conf.Precision == "" {
conf.Precision = "ns"
}
if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
return nil, err
}
bp := &batchpoints{
database: conf.Database,
precision: conf.Precision,
retentionPolicy: conf.RetentionPolicy,
writeConsistency: conf.WriteConsistency,
}
return bp, nil
}
type batchpoints struct {
points []*Point
database string
precision string
retentionPolicy string
writeConsistency string
}
func (bp *batchpoints) AddPoint(p *Point) {
bp.points = append(bp.points, p)
}
func (bp *batchpoints) AddPoints(ps []*Point) {
bp.points = append(bp.points, ps...)
}
func (bp *batchpoints) Points() []*Point {
return bp.points
}
func (bp *batchpoints) Precision() string {
return bp.precision
}
func (bp *batchpoints) Database() string {
return bp.database
}
func (bp *batchpoints) WriteConsistency() string {
return bp.writeConsistency
}
func (bp *batchpoints) RetentionPolicy() string {
return bp.retentionPolicy
}
func (bp *batchpoints) SetPrecision(p string) error {
if _, err := time.ParseDuration("1" + p); err != nil {
return err
}
bp.precision = p
return nil
}
func (bp *batchpoints) SetDatabase(db string) {
bp.database = db
}
func (bp *batchpoints) SetWriteConsistency(wc string) {
bp.writeConsistency = wc
}
func (bp *batchpoints) SetRetentionPolicy(rp string) {
bp.retentionPolicy = rp
}
// Point represents a single data point.
type Point struct {
pt models.Point
}
// NewPoint returns a point with the given timestamp. If a timestamp is not
// given, then data is sent to the database without a timestamp, in which case
// the server will assign local time upon reception. NOTE: it is recommended to
// send data with a timestamp.
func NewPoint(
name string,
tags map[string]string,
fields map[string]interface{},
t ...time.Time,
) (*Point, error) {
var T time.Time
if len(t) > 0 {
T = t[0]
}
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
if err != nil {
return nil, err
}
return &Point{
pt: pt,
}, nil
}
// String returns a line-protocol string of the Point.
func (p *Point) String() string {
return p.pt.String()
}
// PrecisionString returns a line-protocol string of the Point,
// with the timestamp formatted for the given precision.
func (p *Point) PrecisionString(precision string) string {
return p.pt.PrecisionString(precision)
}
// Name returns the measurement name of the point.
func (p *Point) Name() string {
return string(p.pt.Name())
}
// Tags returns the tags associated with the point.
func (p *Point) Tags() map[string]string {
return p.pt.Tags().Map()
}
// Time return the timestamp for the point.
func (p *Point) Time() time.Time {
return p.pt.Time()
}
// UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
func (p *Point) UnixNano() int64 {
return p.pt.UnixNano()
}
// Fields returns the fields for the point.
func (p *Point) Fields() (map[string]interface{}, error) {
return p.pt.Fields()
}
// NewPointFrom returns a point from the provided models.Point.
func NewPointFrom(pt models.Point) *Point {
return &Point{pt: pt}
}
func (c *client) Write(bp BatchPoints) error {
var b bytes.Buffer
for _, p := range bp.Points() {
if p == nil {
continue
}
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
return err
}
if err := b.WriteByte('\n'); err != nil {
return err
}
}
u := c.url
u.Path = path.Join(u.Path, "write")
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database())
params.Set("rp", bp.RetentionPolicy())
params.Set("precision", bp.Precision())
params.Set("consistency", bp.WriteConsistency())
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = errors.New(string(body))
return err
}
return nil
}
// Query defines a query to send to the server.
type Query struct {
Command string
Database string
RetentionPolicy string
Precision string
Chunked bool
ChunkSize int
Parameters map[string]interface{}
}
// NewQuery returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query.
func NewQuery(command, database, precision string) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
Parameters: make(map[string]interface{}),
}
}
// NewQueryWithRP returns a query object.
// The database, retention policy, and precision arguments can be empty strings if they are not needed
// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater.
func NewQueryWithRP(command, database, retentionPolicy, precision string) Query {
return Query{
Command: command,
Database: database,
RetentionPolicy: retentionPolicy,
Precision: precision,
Parameters: make(map[string]interface{}),
}
}
// NewQueryWithParameters returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query.
// parameters is a map of the parameter names used in the command to their values.
func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
Parameters: parameters,
}
}
// Response represents a list of statement results.
type Response struct {
Results []Result
Err string `json:"error,omitempty"`
}
// Error returns the first error from any statement.
// It returns nil if no errors occurred on any statements.
func (r *Response) Error() error {
if r.Err != "" {
return errors.New(r.Err)
}
for _, result := range r.Results {
if result.Err != "" {
return errors.New(result.Err)
}
}
return nil
}
// Message represents a user message.
type Message struct {
Level string
Text string
}
// Result represents a resultset returned from a single statement.
type Result struct {
Series []models.Row
Messages []*Message
Err string `json:"error,omitempty"`
}
// Query sends a command to the server and returns the Response.
func (c *client) Query(q Query) (*Response, error) {
req, err := c.createDefaultRequest(q)
if err != nil {
return nil, err
}
params := req.URL.Query()
if q.Chunked {
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
req.URL.RawQuery = params.Encode()
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := checkResponse(resp); err != nil {
return nil, err
}
var response Response
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
if err == io.EOF {
break
}
// If we got an error while decoding the response, send that back.
return nil, err
}
if r == nil {
break
}
response.Results = append(response.Results, r.Results...)
if r.Err != "" {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
}
}
// If we don't have an error in our json response, and didn't get statusOK
// then send back an error
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
}
return &response, nil
}
// QueryAsChunk sends a command to the server and returns the Response.
func (c *client) QueryAsChunk(q Query) (*ChunkedResponse, error) {
req, err := c.createDefaultRequest(q)
if err != nil {
return nil, err
}
params := req.URL.Query()
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if err := checkResponse(resp); err != nil {
return nil, err
}
return NewChunkedResponse(resp.Body), nil
}
func checkResponse(resp *http.Response) error {
// If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
// but instead some other service. If the error code is also a 500+ code, then some
// downstream loadbalancer/proxy/etc had an issue and we should report that.
if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
body, err := ioutil.ReadAll(resp.Body)
if err != nil || len(body) == 0 {
return fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
}
return fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
}
// If we get an unexpected content type, then it is also not from influx direct and therefore
// we want to know what we received and what status code was returned for debugging purposes.
if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
// Read up to 1kb of the body to help identify downstream errors and limit the impact of things
// like downstream serving a large file
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
if err != nil || len(body) == 0 {
return fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
}
return fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
}
return nil
}
func (c *client) createDefaultRequest(q Query) (*http.Request, error) {
u := c.url
u.Path = path.Join(u.Path, "query")
jsonParameters, err := json.Marshal(q.Parameters)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("q", q.Command)
params.Set("db", q.Database)
if q.RetentionPolicy != "" {
params.Set("rp", q.RetentionPolicy)
}
params.Set("params", string(jsonParameters))
if q.Precision != "" {
params.Set("epoch", q.Precision)
}
req.URL.RawQuery = params.Encode()
return req, nil
}
// duplexReader reads responses and writes it to another writer while
// satisfying the reader interface.
type duplexReader struct {
r io.ReadCloser
w io.Writer
}
func (r *duplexReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
if err == nil {
r.w.Write(p[:n])
}
return n, err
}
// Close closes the response.
func (r *duplexReader) Close() error {
return r.r.Close()
}
// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
duplex *duplexReader
buf bytes.Buffer
}
// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
rc, ok := r.(io.ReadCloser)
if !ok {
rc = ioutil.NopCloser(r)
}
resp := &ChunkedResponse{}
resp.duplex = &duplexReader{r: rc, w: &resp.buf}
resp.dec = json.NewDecoder(resp.duplex)
resp.dec.UseNumber()
return resp
}
// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, err
}
// A decoding error happened. This probably means the server crashed
// and sent a last-ditch error message to us. Ensure we have read the
// entirety of the connection to get any remaining error text.
io.Copy(ioutil.Discard, r.duplex)
return nil, errors.New(strings.TrimSpace(r.buf.String()))
}
r.buf.Reset()
return &response, nil
}
// Close closes the response.
func (r *ChunkedResponse) Close() error {
return r.duplex.Close()
}

116
vendor/github.com/influxdata/influxdb/client/v2/udp.go generated vendored Normal file
View File

@@ -0,0 +1,116 @@
package client
import (
"fmt"
"io"
"net"
"time"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client.
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type udpclient struct {
conn io.WriteCloser
payloadSize int
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b = b[:0]
}
}
for _, p := range bp.Points() {
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"
checkBuffer(pointSize)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
return delayedError
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) QueryAsChunk(q Query) (*ChunkedResponse, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}

View File

@@ -0,0 +1,48 @@
package models
import (
"errors"
"strings"
)
// ConsistencyLevel represent a required replication criteria before a write can
// be returned as successful.
//
// The consistency level is handled in open-source InfluxDB but only applicable to clusters.
type ConsistencyLevel int
const (
// ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet.
ConsistencyLevelAny ConsistencyLevel = iota
// ConsistencyLevelOne requires at least one data node acknowledged a write.
ConsistencyLevelOne
// ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write.
ConsistencyLevelQuorum
// ConsistencyLevelAll requires all data nodes to acknowledge a write.
ConsistencyLevelAll
)
var (
// ErrInvalidConsistencyLevel is returned when parsing the string version
// of a consistency level.
ErrInvalidConsistencyLevel = errors.New("invalid consistency level")
)
// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const.
func ParseConsistencyLevel(level string) (ConsistencyLevel, error) {
switch strings.ToLower(level) {
case "any":
return ConsistencyLevelAny, nil
case "one":
return ConsistencyLevelOne, nil
case "quorum":
return ConsistencyLevelQuorum, nil
case "all":
return ConsistencyLevelAll, nil
default:
return 0, ErrInvalidConsistencyLevel
}
}

View File

@@ -0,0 +1,32 @@
package models // import "github.com/influxdata/influxdb/models"
// from stdlib hash/fnv/fnv.go
const (
prime64 = 1099511628211
offset64 = 14695981039346656037
)
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function.
type InlineFNV64a uint64
// NewInlineFNV64a returns a new instance of InlineFNV64a.
func NewInlineFNV64a() InlineFNV64a {
return offset64
}
// Write adds data to the running hash.
func (s *InlineFNV64a) Write(data []byte) (int, error) {
hash := uint64(*s)
for _, c := range data {
hash ^= uint64(c)
hash *= prime64
}
*s = InlineFNV64a(hash)
return len(data), nil
}
// Sum64 returns the uint64 of the current resulting hash.
func (s *InlineFNV64a) Sum64() uint64 {
return uint64(*s)
}

View File

@@ -0,0 +1,44 @@
package models // import "github.com/influxdata/influxdb/models"
import (
"reflect"
"strconv"
"unsafe"
)
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}
// parseUintBytes is a zero-alloc wrapper around strconv.ParseUint.
func parseUintBytes(b []byte, base int, bitSize int) (i uint64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseUint(s, base, bitSize)
}
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}
// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions
// that require strings.
func unsafeBytesToString(in []byte) string {
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
dst := reflect.StringHeader{
Data: src.Data,
Len: src.Len,
}
s := *(*string)(unsafe.Pointer(&dst))
return s
}

2203
vendor/github.com/influxdata/influxdb/models/points.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

62
vendor/github.com/influxdata/influxdb/models/rows.go generated vendored Normal file
View File

@@ -0,0 +1,62 @@
package models
import (
"sort"
)
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}
// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := NewInlineFNV64a()
keys := r.tagsKeys()
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(r.Tags[k]))
}
return h.Sum64()
}
// tagKeys returns a sorted list of tag keys.
func (r *Row) tagsKeys() []string {
a := make([]string, 0, len(r.Tags))
for k := range r.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// Rows represents a collection of rows. Rows implements sort.Interface.
type Rows []*Row
// Len implements sort.Interface.
func (p Rows) Len() int { return len(p) }
// Less implements sort.Interface.
func (p Rows) Less(i, j int) bool {
// Sort by name first.
if p[i].Name != p[j].Name {
return p[i].Name < p[j].Name
}
// Sort by tag set hash. Tags don't have a meaningful sort order so we
// just compute a hash and sort by that instead. This allows the tests
// to receive rows in a predictable order every time.
return p[i].tagsHash() < p[j].tagsHash()
}
// Swap implements sort.Interface.
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@@ -0,0 +1,42 @@
package models
// Statistic is the representation of a statistic used by the monitoring service.
type Statistic struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// NewStatistic returns an initialized Statistic.
func NewStatistic(name string) Statistic {
return Statistic{
Name: name,
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
}
// StatisticTags is a map that can be merged with others without causing
// mutations to either map.
type StatisticTags map[string]string
// Merge creates a new map containing the merged contents of tags and t.
// If both tags and the receiver map contain the same key, the value in tags
// is used in the resulting map.
//
// Merge always returns a usable map.
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
// Add everything in tags to the result.
out := make(map[string]string, len(tags))
for k, v := range tags {
out[k] = v
}
// Only add values from t that don't appear in tags.
for k, v := range t {
if _, ok := tags[k]; !ok {
out[k] = v
}
}
return out
}

74
vendor/github.com/influxdata/influxdb/models/time.go generated vendored Normal file
View File

@@ -0,0 +1,74 @@
package models
// Helper time methods since parsing time can easily overflow and we only support a
// specific time range.
import (
"fmt"
"math"
"time"
)
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2
// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)
var (
minNanoTime = time.Unix(0, MinNanoTime).UTC()
maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime)
)
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
// supported range.
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
mult := GetPrecisionMultiplier(precision)
if t, ok := safeSignedMult(timestamp, mult); ok {
tme := time.Unix(0, t).UTC()
return tme, CheckTime(tme)
}
return time.Time{}, ErrTimeOutOfRange
}
// CheckTime checks that a time is within the safe range.
func CheckTime(t time.Time) error {
if t.Before(minNanoTime) || t.After(maxNanoTime) {
return ErrTimeOutOfRange
}
return nil
}
// Perform the multiplication and check to make sure it didn't overflow.
func safeSignedMult(a, b int64) (int64, bool) {
if a == 0 || b == 0 || a == 1 || b == 1 {
return a * b, true
}
if a == MinNanoTime || b == MaxNanoTime {
return 0, false
}
c := a * b
return c, c/b == a
}

View File

@@ -0,0 +1,7 @@
// +build uint uint64
package models
func init() {
EnableUintSupport()
}

View File

@@ -0,0 +1,115 @@
// Package escape contains utilities for escaping parts of InfluxQL
// and InfluxDB line protocol.
package escape // import "github.com/influxdata/influxdb/pkg/escape"
import (
"bytes"
"strings"
)
// Codes is a map of bytes to be escaped.
var Codes = map[byte][]byte{
',': []byte(`\,`),
'"': []byte(`\"`),
' ': []byte(`\ `),
'=': []byte(`\=`),
}
// Bytes escapes characters on the input slice, as defined by Codes.
func Bytes(in []byte) []byte {
for b, esc := range Codes {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
return in
}
const escapeChars = `," =`
// IsEscaped returns whether b has any escaped characters,
// i.e. whether b seems to have been processed by Bytes.
func IsEscaped(b []byte) bool {
for len(b) > 0 {
i := bytes.IndexByte(b, '\\')
if i < 0 {
return false
}
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
return true
}
b = b[i+1:]
}
return false
}
// AppendUnescaped appends the unescaped version of src to dst
// and returns the resulting slice.
func AppendUnescaped(dst, src []byte) []byte {
var pos int
for len(src) > 0 {
next := bytes.IndexByte(src[pos:], '\\')
if next < 0 || pos+next+1 >= len(src) {
return append(dst, src...)
}
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
if pos+next > 0 {
dst = append(dst, src[:pos+next]...)
}
src = src[pos+next+1:]
pos = 0
} else {
pos += next + 1
}
}
return dst
}
// Unescape returns a new slice containing the unescaped version of in.
func Unescape(in []byte) []byte {
if len(in) == 0 {
return nil
}
if bytes.IndexByte(in, '\\') == -1 {
return in
}
i := 0
inLen := len(in)
// The output size will be no more than inLen. Preallocating the
// capacity of the output is faster and uses less memory than
// letting append() do its own (over)allocation.
out := make([]byte, 0, inLen)
for {
if i >= inLen {
break
}
if in[i] == '\\' && i+1 < inLen {
switch in[i+1] {
case ',':
out = append(out, ',')
i += 2
continue
case '"':
out = append(out, '"')
i += 2
continue
case ' ':
out = append(out, ' ')
i += 2
continue
case '=':
out = append(out, '=')
i += 2
continue
}
}
out = append(out, in[i])
i += 1
}
return out
}

View File

@@ -0,0 +1,21 @@
package escape
import "strings"
var (
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
)
// UnescapeString returns unescaped version of in.
func UnescapeString(in string) string {
if strings.IndexByte(in, '\\') == -1 {
return in
}
return unescaper.Replace(in)
}
// String returns the escaped version of in.
func String(in string) string {
return escaper.Replace(in)
}

21
vendor/github.com/influxdata/platform/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2018 InfluxData
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,32 @@
package models // import "github.com/influxdata/platform/models"
// from stdlib hash/fnv/fnv.go
const (
prime64 = 1099511628211
offset64 = 14695981039346656037
)
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function.
type InlineFNV64a uint64
// NewInlineFNV64a returns a new instance of InlineFNV64a.
func NewInlineFNV64a() InlineFNV64a {
return offset64
}
// Write adds data to the running hash.
func (s *InlineFNV64a) Write(data []byte) (int, error) {
hash := uint64(*s)
for _, c := range data {
hash ^= uint64(c)
hash *= prime64
}
*s = InlineFNV64a(hash)
return len(data), nil
}
// Sum64 returns the uint64 of the current resulting hash.
func (s *InlineFNV64a) Sum64() uint64 {
return uint64(*s)
}

View File

@@ -0,0 +1,44 @@
package models // import "github.com/influxdata/platform/models"
import (
"reflect"
"strconv"
"unsafe"
)
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}
// parseUintBytes is a zero-alloc wrapper around strconv.ParseUint.
func parseUintBytes(b []byte, base int, bitSize int) (i uint64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseUint(s, base, bitSize)
}
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}
// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions
// that require strings.
func unsafeBytesToString(in []byte) string {
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
dst := reflect.StringHeader{
Data: src.Data,
Len: src.Len,
}
s := *(*string)(unsafe.Pointer(&dst))
return s
}

2490
vendor/github.com/influxdata/platform/models/points.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

62
vendor/github.com/influxdata/platform/models/rows.go generated vendored Normal file
View File

@@ -0,0 +1,62 @@
package models
import (
"sort"
)
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}
// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := NewInlineFNV64a()
keys := r.tagsKeys()
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(r.Tags[k]))
}
return h.Sum64()
}
// tagKeys returns a sorted list of tag keys.
func (r *Row) tagsKeys() []string {
a := make([]string, 0, len(r.Tags))
for k := range r.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// Rows represents a collection of rows. Rows implements sort.Interface.
type Rows []*Row
// Len implements sort.Interface.
func (p Rows) Len() int { return len(p) }
// Less implements sort.Interface.
func (p Rows) Less(i, j int) bool {
// Sort by name first.
if p[i].Name != p[j].Name {
return p[i].Name < p[j].Name
}
// Sort by tag set hash. Tags don't have a meaningful sort order so we
// just compute a hash and sort by that instead. This allows the tests
// to receive rows in a predictable order every time.
return p[i].tagsHash() < p[j].tagsHash()
}
// Swap implements sort.Interface.
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@@ -0,0 +1,42 @@
package models
// Statistic is the representation of a statistic used by the monitoring service.
type Statistic struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// NewStatistic returns an initialized Statistic.
func NewStatistic(name string) Statistic {
return Statistic{
Name: name,
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
}
// StatisticTags is a map that can be merged with others without causing
// mutations to either map.
type StatisticTags map[string]string
// Merge creates a new map containing the merged contents of tags and t.
// If both tags and the receiver map contain the same key, the value in tags
// is used in the resulting map.
//
// Merge always returns a usable map.
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
// Add everything in tags to the result.
out := make(map[string]string, len(tags))
for k, v := range tags {
out[k] = v
}
// Only add values from t that don't appear in tags.
for k, v := range t {
if _, ok := tags[k]; !ok {
out[k] = v
}
}
return out
}

74
vendor/github.com/influxdata/platform/models/time.go generated vendored Normal file
View File

@@ -0,0 +1,74 @@
package models
// Helper time methods since parsing time can easily overflow and we only support a
// specific time range.
import (
"fmt"
"math"
"time"
)
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2
// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)
var (
minNanoTime = time.Unix(0, MinNanoTime).UTC()
maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime)
)
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
// supported range.
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
mult := GetPrecisionMultiplier(precision)
if t, ok := safeSignedMult(timestamp, mult); ok {
tme := time.Unix(0, t).UTC()
return tme, CheckTime(tme)
}
return time.Time{}, ErrTimeOutOfRange
}
// CheckTime checks that a time is within the safe range.
func CheckTime(t time.Time) error {
if t.Before(minNanoTime) || t.After(maxNanoTime) {
return ErrTimeOutOfRange
}
return nil
}
// Perform the multiplication and check to make sure it didn't overflow.
func safeSignedMult(a, b int64) (int64, bool) {
if a == 0 || b == 0 || a == 1 || b == 1 {
return a * b, true
}
if a == MinNanoTime || b == MaxNanoTime {
return 0, false
}
c := a * b
return c, c/b == a
}

View File

@@ -0,0 +1,7 @@
// +build uint uint64
package models
func init() {
EnableUintSupport()
}

View File

@@ -0,0 +1,115 @@
// Package escape contains utilities for escaping parts of InfluxQL
// and InfluxDB line protocol.
package escape // import "github.com/influxdata/platform/pkg/escape"
import (
"bytes"
"strings"
)
// Codes is a map of bytes to be escaped.
var Codes = map[byte][]byte{
',': []byte(`\,`),
'"': []byte(`\"`),
' ': []byte(`\ `),
'=': []byte(`\=`),
}
// Bytes escapes characters on the input slice, as defined by Codes.
func Bytes(in []byte) []byte {
for b, esc := range Codes {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
return in
}
const escapeChars = `," =`
// IsEscaped returns whether b has any escaped characters,
// i.e. whether b seems to have been processed by Bytes.
func IsEscaped(b []byte) bool {
for len(b) > 0 {
i := bytes.IndexByte(b, '\\')
if i < 0 {
return false
}
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
return true
}
b = b[i+1:]
}
return false
}
// AppendUnescaped appends the unescaped version of src to dst
// and returns the resulting slice.
func AppendUnescaped(dst, src []byte) []byte {
var pos int
for len(src) > 0 {
next := bytes.IndexByte(src[pos:], '\\')
if next < 0 || pos+next+1 >= len(src) {
return append(dst, src...)
}
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
if pos+next > 0 {
dst = append(dst, src[:pos+next]...)
}
src = src[pos+next+1:]
pos = 0
} else {
pos += next + 1
}
}
return dst
}
// Unescape returns a new slice containing the unescaped version of in.
func Unescape(in []byte) []byte {
if len(in) == 0 {
return nil
}
if bytes.IndexByte(in, '\\') == -1 {
return in
}
i := 0
inLen := len(in)
// The output size will be no more than inLen. Preallocating the
// capacity of the output is faster and uses less memory than
// letting append() do its own (over)allocation.
out := make([]byte, 0, inLen)
for {
if i >= inLen {
break
}
if in[i] == '\\' && i+1 < inLen {
switch in[i+1] {
case ',':
out = append(out, ',')
i += 2
continue
case '"':
out = append(out, '"')
i += 2
continue
case ' ':
out = append(out, ' ')
i += 2
continue
case '=':
out = append(out, '=')
i += 2
continue
}
}
out = append(out, in[i])
i += 1
}
return out
}

View File

@@ -0,0 +1,21 @@
package escape
import "strings"
var (
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
)
// UnescapeString returns unescaped version of in.
func UnescapeString(in string) string {
if strings.IndexByte(in, '\\') == -1 {
return in
}
return unescaper.Replace(in)
}
// String returns the escaped version of in.
func String(in string) string {
return escaper.Replace(in)
}

32
vendor/vendor.json vendored
View File

@@ -1490,6 +1490,38 @@
"revision": "2f1d1f20f75d5404f53b9edf6b53ed5505508675",
"revisionTime": "2018-10-12T17:50:58Z"
},
{
"checksumSHA1": "MUEiIhsVEYI4xI2TPCPElVLicEw=",
"path": "github.com/influxdata/influxdb/client/v2",
"revision": "cb03c542a4054f0f4d3dc13919d31c456bdb5c39",
"revisionTime": "2018-11-14T18:54:52Z",
"version": "v1.7.1",
"versionExact": "v1.7.1"
},
{
"checksumSHA1": "/SC4ZMeKnNqhuYDhWLZp+z4yeG8=",
"path": "github.com/influxdata/influxdb/models",
"revision": "9dff5f072fb7b0aab643c9e2338b52763f59b7b6",
"revisionTime": "2018-12-07T16:01:49Z"
},
{
"checksumSHA1": "Z0Bb5PWa5WL/j5Dm2KJCLGn1l7U=",
"path": "github.com/influxdata/influxdb/pkg/escape",
"revision": "9dff5f072fb7b0aab643c9e2338b52763f59b7b6",
"revisionTime": "2018-12-07T16:01:49Z"
},
{
"checksumSHA1": "lDHmXNdRAl7aVdjAiPM8Y0SP5Vo=",
"path": "github.com/influxdata/platform/models",
"revision": "df0b084543160564b4afa50a944d3d20145f3120",
"revisionTime": "2018-12-08T01:46:35Z"
},
{
"checksumSHA1": "Z1jNHNivahCME1jnd0nsJ/rUpfc=",
"path": "github.com/influxdata/platform/pkg/escape",
"revision": "df0b084543160564b4afa50a944d3d20145f3120",
"revisionTime": "2018-12-08T01:46:35Z"
},
{
"checksumSHA1": "FHpNxGUqXNu02lBFaei16tXFhU0=",
"path": "github.com/jackc/pgx",

View File

@@ -0,0 +1,131 @@
---
layout: "api"
page_title: "Influxdb - Database - Secrets Engines - HTTP API"
sidebar_title: "Influxdb"
sidebar_current: "api-http-secret-databases-influxdb"
description: |-
The Influxdb plugin for Vault's database secrets engine generates database credentials to access Influxdb servers.
---
# Influxdb Database Plugin HTTP API
The Influxdb database plugin is one of the supported plugins for the database
secrets engine. This plugin generates database credentials dynamically based on
configured roles for the Influxdb database.
## Configure Connection
In addition to the parameters defined by the [Database
Secrets Engine](/api/secret/databases/index.html#configure-connection), this plugin
has a number of parameters to further configure a connection.
| Method | Path | Produces |
| :------- | :--------------------------- | :--------------------- |
| `POST` | `/database/config/:name` | `204 (empty body)` |
### Parameters
- `host` `(string: <required>)` Specifies a Influxdb
host to connect to.
- `port` `(int: 8086)` Specifies the default port to use if none is provided
as part of the host URI. Defaults to Influxdb's default transport port, 8086.
- `username` `(string: <required>)` Specifies the username to use for
superuser access.
- `password` `(string: <required>)` Specifies the password corresponding to
the given username.
- `tls` `(bool: true)` Specifies whether to use TLS when connecting to
Influxdb.
- `insecure_tls` `(bool: false)` Specifies whether to skip verification of the
server certificate when using TLS.
- `pem_bundle` `(string: "")` Specifies concatenated PEM blocks containing a
certificate and private key; a certificate, private key, and issuing CA
certificate; or just a CA certificate.
- `pem_json` `(string: "")` Specifies JSON containing a certificate and
private key; a certificate, private key, and issuing CA certificate; or just a
CA certificate. For convenience format is the same as the output of the
`issue` command from the `pki` secrets engine; see
[the pki documentation](/docs/secrets/pki/index.html).
- `connect_timeout` `(string: "5s")` Specifies the connection timeout to use.
TLS works as follows:
- If `tls` is set to true, the connection will use TLS; this happens
automatically if `pem_bundle`, `pem_json`, or `insecure_tls` is set
- If `insecure_tls` is set to true, the connection will not perform verification
of the server certificate; this also sets `tls` to true
- If only `issuing_ca` is set in `pem_json`, or the only certificate in
`pem_bundle` is a CA certificate, the given CA certificate will be used for
server certificate verification; otherwise the system CA certificates will be
used
- If `certificate` and `private_key` are set in `pem_bundle` or `pem_json`,
client auth will be turned on for the connection
`pem_bundle` should be a PEM-concatenated bundle of a private key + client
certificate, an issuing CA certificate, or both. `pem_json` should contain the
same information; for convenience, the JSON format is the same as that output by
the issue command from the PKI secrets engine.
### Sample Payload
```json
{
"plugin_name": "influxdb-database-plugin",
"allowed_roles": "readonly",
"host": "influxdb1.local",
"username": "user",
"password": "pass"
}
```
### Sample Request
```
$ curl \
--header "X-Vault-Token: ..." \
--request POST \
--data @payload.json \
http://127.0.0.1:8200/v1/influxdb/config/connection
```
## Statements
Statements are configured during role creation and are used by the plugin to
determine what is sent to the database on user creation, renewing, and
revocation. For more information on configuring roles see the [Role
API](/api/secret/databases/index.html#create-role) in the database secrets engine docs.
### Parameters
The following are the statements used by this plugin. If not mentioned in this
list the plugin does not support that statement type.
- `creation_statements` `(list: [])` Specifies the database
statements executed to create and configure a user. Must be a
semicolon-separated string, a base64-encoded semicolon-separated string, a
serialized JSON string array, or a base64-encoded serialized JSON string
array. The '{{username}}' and '{{password}}' values will be substituted. If not
provided, defaults to a generic create user statements that creates a
non-superuser.
- `revocation_statements` `(list: [])` Specifies the database statements to
be executed to revoke a user. Must be a semicolon-separated string, a
base64-encoded semicolon-separated string, a serialized JSON string array, or
a base64-encoded serialized JSON string array. The '{{username}}' value will be
substituted. If not provided defaults to a generic drop user statement.
- `rollback_statements` `(list: [])` Specifies the database statements to be
executed to rollback a create operation in the event of an error. Must be a
semicolon-separated string, a base64-encoded semicolon-separated string, a
serialized JSON string array, or a base64-encoded serialized JSON string
array. The '{{username}}' value will be substituted. If not provided, defaults to
a generic drop user statement

View File

@@ -0,0 +1,82 @@
---
layout: "docs"
page_title: "Influxdb - Database - Secrets Engines"
sidebar_title: "Influxdb"
sidebar_current: "docs-secrets-databases-influxdb"
description: |-
Influxdb is one of the supported plugins for the database secrets engine.
This plugin generates database credentials dynamically based on configured
roles for the Influxdb database.
---
# Influxdb Database Secrets Engine
Influxdb is one of the supported plugins for the database secrets engine. This
plugin generates database credentials dynamically based on configured roles for
the Influxdb database.
See the [database secrets engine](/docs/secrets/databases/index.html) docs for
more information about setting up the database secrets engine.
## Setup
1. Enable the database secrets engine if it is not already enabled:
```text
$ vault secrets enable database
Success! Enabled the database secrets engine at: database/
```
By default, the secrets engine will enable at the name of the engine. To
enable the secrets engine at a different path, use the `-path` argument.
1. Configure Vault with the proper plugin and connection information:
```text
$ vault write database/config/my-influxdb-database \
plugin_name="influxdb-database-plugin" \
host=127.0.0.1 \
username=influx-root \
password=influx-toor \
allowed_roles=my-role
```
1. Configure a role that maps a name in Vault to an SQL statement to execute to
create the database credential:
```text
$ vault write database/roles/my-role \
db_name=my-influxdb-database \
creation_statements=CREATE USER "{{username}}" WITH PASSWORD '{{password}}'; \
GRANT ALL ON "vault" TO "{{username}}";" \
default_ttl="1h" \
max_ttl="24h"
Success! Data written to: database/roles/my-role
```
## Usage
After the secrets engine is configured and a user/machine has a Vault token with
the proper permission, it can generate credentials.
1. Generate a new credential by reading from the `/creds` endpoint with the name
of the role:
```text
$ vault read database/creds/my-role
Key Value
--- -----
lease_id database/creds/my-role/2f6a614c-4aa2-7b19-24b9-ad944a8d4de6
lease_duration 1h
lease_renewable true
password 8cab931c-d62e-a73d-60d3-5ee85139cd66
username v-root-e2978cd0-
```
## API
The full list of configurable options can be seen in the [Influxdb database
plugin API](/api/secret/databases/influxdb.html) page.
For more information on the database secrets engine's HTTP API please see the [Database secret
secrets engine API](/api/secret/databases/index.html) page.