From cc4864ca9eb9fbd869bf6c9728ee5b6bcdb5a431 Mon Sep 17 00:00:00 2001 From: Dario Tranchitella Date: Fri, 2 Dec 2022 18:41:55 +0100 Subject: [PATCH] feat: datastore migration drivers --- go.mod | 6 +- go.sum | 4 ++ internal/datastore/connection.go | 1 + internal/datastore/etcd.go | 21 +++++++ internal/datastore/mysql.go | 56 +++++++++++++++++- internal/datastore/postgresql.go | 98 ++++++++++++++++++++++++++++---- 6 files changed, 172 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 2512343..8f3fb3c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/clastix/kamaji go 1.18 require ( + github.com/JamesStewy/go-mysqldump v0.2.2 github.com/blang/semver v3.5.1+incompatible github.com/go-logr/logr v1.2.3 github.com/go-pg/pg/v10 v10.10.6 @@ -34,6 +35,7 @@ require ( cloud.google.com/go v0.99.0 // indirect cloud.google.com/go/storage v1.18.2 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect + github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect github.com/Microsoft/go-winio v0.4.17 // indirect github.com/Microsoft/hcsshim v0.8.23 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect @@ -147,7 +149,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.25.0 // indirect - k8s.io/cli-runtime v0.25.0 // indirect + k8s.io/cli-runtime v0.25.4 // indirect k8s.io/component-base v0.25.4 // indirect k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect k8s.io/kube-proxy v0.0.0 // indirect @@ -187,3 +189,5 @@ replace ( k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.25.4 k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.25.4 ) + +replace github.com/JamesStewy/go-mysqldump => github.com/vtoma/go-mysqldump v1.0.0 diff --git a/go.sum b/go.sum index a2a7eb2..d2ecaad 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= +github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw= @@ -879,6 +881,8 @@ github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vb github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/vtoma/go-mysqldump v1.0.0 h1:TNQXlsGD4r1+P9cMyzeWnOgHy0QbP0R+XfXqu5Is0fI= +github.com/vtoma/go-mysqldump v1.0.0/go.mod h1:i9PUM5mb4MH+4D4tJktCTLWDy6CX5rDCL/nS4+f3N5w= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/internal/datastore/connection.go b/internal/datastore/connection.go index c386534..5601c55 100644 --- a/internal/datastore/connection.go +++ b/internal/datastore/connection.go @@ -52,4 +52,5 @@ type Connection interface { Close() error Check(ctx context.Context) error Driver() string + Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error } diff --git a/internal/datastore/etcd.go b/internal/datastore/etcd.go index 7078009..d4ab6aa 100644 --- a/internal/datastore/etcd.go +++ b/internal/datastore/etcd.go @@ -173,3 +173,24 @@ func (e *EtcdClient) Driver() string { func (e *EtcdClient) buildKey(key string) string { return fmt.Sprintf("/%s/", key) } + +func (e *EtcdClient) Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error { + targetClient := target.(*EtcdClient) //nolint:forcetypeassert + + if err := target.Check(ctx); err != nil { + return err + } + + response, err := e.Client.Get(ctx, e.buildKey(fmt.Sprintf("%s_%s", tcp.GetNamespace(), tcp.GetName())), etcdclient.WithRange(rangeEnd)) + if err != nil { + return err + } + + for _, kv := range response.Kvs { + if _, err = targetClient.Client.Put(ctx, string(kv.Key), string(kv.Value)); err != nil { + return err + } + } + + return nil +} diff --git a/internal/datastore/mysql.go b/internal/datastore/mysql.go index e5cee89..671a726 100644 --- a/internal/datastore/mysql.go +++ b/internal/datastore/mysql.go @@ -8,7 +8,10 @@ import ( "database/sql" "fmt" "net/url" + "os" + "time" + "github.com/JamesStewy/go-mysqldump" "github.com/go-sql-driver/mysql" kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1" @@ -22,7 +25,7 @@ const ( const ( mysqlFetchUserStatement = "SELECT User FROM mysql.user WHERE User= ? LIMIT 1" - mysqlFetchDBStatement = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME= ? LIMIT 1" + mysqlFetchDBStatement = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=? LIMIT 1" mysqlShowGrantsStatement = "SHOW GRANTS FOR `%s`@`%%`" mysqlCreateDBStatement = "CREATE DATABASE IF NOT EXISTS %s" mysqlCreateUserStatement = "CREATE USER `%s`@`%%` IDENTIFIED BY '%s'" @@ -37,6 +40,57 @@ type MySQLConnection struct { connector ConnectionEndpoint } +func (c *MySQLConnection) Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error { + // Ensuring the connection is working as expected + if err := target.Check(ctx); err != nil { + return err + } + // Creating the target schema if it doesn't exist + if ok, _ := target.DBExists(ctx, tcp.Status.Storage.Setup.Schema); !ok { + if err := target.CreateDB(ctx, tcp.Status.Storage.Setup.Schema); err != nil { + return err + } + } + // Dumping the old datastore in a local file + dir, err := os.MkdirTemp("", string(tcp.GetUID())) + if err != nil { + return fmt.Errorf("unable to create temp directory for MySQL migration: %w", err) + } + defer os.RemoveAll(dir) + + if _, err = c.db.ExecContext(ctx, fmt.Sprintf("USE %s_%s", tcp.GetNamespace(), tcp.GetName())); err != nil { + return fmt.Errorf("unable to switch DB for MySQL migration: %w", err) + } + + dumper, err := mysqldump.Register(c.db, dir, fmt.Sprintf("%d", time.Now().Unix())) + if err != nil { + return fmt.Errorf("unable to create MySQL dumper: %w", err) + } + defer dumper.Close() + + dumpFile, err := dumper.Dump() + if err != nil { + return fmt.Errorf("unable to dump from MySQL: %w", err) + } + + statements, err := os.ReadFile(dumpFile) + if err != nil { + return fmt.Errorf("cannot read dump file for MySQL: %w", err) + } + // Executing the import to the target datastore + targetClient := target.(*MySQLConnection) //nolint:forcetypeassert + + if _, err = targetClient.db.ExecContext(ctx, fmt.Sprintf("USE %s_%s", tcp.GetNamespace(), tcp.GetName())); err != nil { + return fmt.Errorf("unable to switch DB for MySQL migration: %w", err) + } + + if _, err = targetClient.db.ExecContext(ctx, string(statements)); err != nil { + return fmt.Errorf("cannot execute dump statements for MySQL: %w", err) + } + + return nil +} + func (c *MySQLConnection) Driver() string { return string(kamajiv1alpha1.KineMySQLDriver) } diff --git a/internal/datastore/postgresql.go b/internal/datastore/postgresql.go index f5fe73f..32baf8c 100644 --- a/internal/datastore/postgresql.go +++ b/internal/datastore/postgresql.go @@ -4,6 +4,7 @@ package datastore import ( + "bytes" "context" "fmt" "strings" @@ -15,20 +16,82 @@ import ( ) const ( - postgresqlFetchDBStatement = "SELECT FROM pg_database WHERE datname = ?" - postgresqlCreateDBStatement = "CREATE DATABASE %s" - postgresqlUserExists = "SELECT 1 FROM pg_roles WHERE rolname = ?" - postgresqlCreateUserStatement = "CREATE ROLE %s LOGIN PASSWORD ?" - postgresqlShowGrantsStatement = "SELECT has_database_privilege(rolname, ?, 'create') from pg_roles where rolcanlogin and rolname = ?" - postgresqlGrantPrivilegesStatement = "GRANT ALL PRIVILEGES ON DATABASE %s TO %s" - postgresqlRevokePrivilegesStatement = "REVOKE ALL PRIVILEGES ON DATABASE %s FROM %s" - postgresqlDropRoleStatement = "DROP ROLE %s" - postgresqlDropDBStatement = "DROP DATABASE %s WITH (FORCE)" + postgresqlFetchDBStatement = "SELECT FROM pg_database WHERE datname = ?" + postgresqlCreateDBStatement = "CREATE DATABASE %s" + postgresqlUserExists = "SELECT 1 FROM pg_roles WHERE rolname = ?" + postgresqlCreateUserStatement = "CREATE ROLE %s LOGIN PASSWORD ?" + postgresqlShowGrantsStatement = "SELECT has_database_privilege(rolname, ?, 'create') from pg_roles where rolcanlogin and rolname = ?" + postgresqlShowOwnershipStatement = "SELECT 't' FROM pg_catalog.pg_database AS d WHERE d.datname = ? AND pg_catalog.pg_get_userbyid(d.datdba) = ?" + postgresqlShowTableOwnershipStatement = "SELECT 't' from pg_tables where tableowner = ? AND tablename = ?" + postgresqlGrantPrivilegesStatement = "GRANT ALL PRIVILEGES ON DATABASE %s TO %s" + postgresqlChangeOwnerStatement = "ALTER DATABASE %s OWNER TO %s" + postgresqlRevokePrivilegesStatement = "REVOKE ALL PRIVILEGES ON DATABASE %s FROM %s" + postgresqlDropRoleStatement = "DROP ROLE %s" + postgresqlDropDBStatement = "DROP DATABASE %s WITH (FORCE)" ) type PostgreSQLConnection struct { - db *pg.DB - connection ConnectionEndpoint + db *pg.DB + connection ConnectionEndpoint + switchDatabaseFn func(dbName string) *pg.DB +} + +func (r *PostgreSQLConnection) Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error { + // Ensuring the connection is working as expected + if err := target.Check(ctx); err != nil { + return fmt.Errorf("unable to check target datastore: %w", err) + } + // Creating the target schema if it doesn't exist + if ok, _ := target.DBExists(ctx, tcp.Status.Storage.Setup.Schema); !ok { + if err := target.CreateDB(ctx, tcp.Status.Storage.Setup.Schema); err != nil { + return err + } + } + + targetConn := target.(*PostgreSQLConnection).switchDatabaseFn(tcp.Status.Storage.Setup.Schema) //nolint:forcetypeassert + + err := targetConn.RunInTransaction(ctx, func(tx *pg.Tx) error { + for _, stm := range []string{ + `CREATE TABLE IF NOT EXISTS kine ( + id SERIAL PRIMARY KEY, + name VARCHAR(630), + created INTEGER, + deleted INTEGER, + create_revision INTEGER, + prev_revision INTEGER, + lease INTEGER, + value bytea, + old_value bytea + )`, + `TRUNCATE TABLE kine`, + `CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`, + `CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`, + `CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`, + `CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`, + `CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`, + } { + if _, err := tx.ExecContext(ctx, stm); err != nil { + return fmt.Errorf("unable to perform schema creation: %w", err) + } + } + // Dumping the old datastore in a local buffer + var buf bytes.Buffer + + if _, err := r.switchDatabaseFn(tcp.Status.Storage.Setup.Schema).WithContext(ctx).CopyTo(&buf, "COPY kine TO STDOUT"); err != nil { //nolint:contextcheck + return fmt.Errorf("unable to copy from the origin datastore: %w", err) + } + + if _, err := tx.CopyFrom(&buf, "COPY kine FROM STDIN"); err != nil { + return fmt.Errorf("unable to copy to the target datastore: %w", err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("unable to perform migration transaction: %w", err) + } + + return nil } func NewPostgreSQLConnection(config ConnectionConfig) (Connection, error) { @@ -40,7 +103,18 @@ func NewPostgreSQLConnection(config ConnectionConfig) (Connection, error) { TLSConfig: config.TLSConfig, } - return &PostgreSQLConnection{db: pg.Connect(opt), connection: config.Endpoints[0]}, nil + fn := func(dbName string) *pg.DB { + o := opt + o.Database = dbName + + return pg.Connect(o) + } + + return &PostgreSQLConnection{ + db: pg.Connect(opt), + switchDatabaseFn: fn, + connection: config.Endpoints[0], + }, nil } func (r *PostgreSQLConnection) Driver() string {