feat: datastore migration drivers

This commit is contained in:
Dario Tranchitella
2022-12-02 18:41:55 +01:00
parent ece1a4e7ee
commit cc4864ca9e
6 changed files with 172 additions and 14 deletions

6
go.mod
View File

@@ -3,6 +3,7 @@ module github.com/clastix/kamaji
go 1.18
require (
github.com/JamesStewy/go-mysqldump v0.2.2
github.com/blang/semver v3.5.1+incompatible
github.com/go-logr/logr v1.2.3
github.com/go-pg/pg/v10 v10.10.6
@@ -34,6 +35,7 @@ require (
cloud.google.com/go v0.99.0 // indirect
cloud.google.com/go/storage v1.18.2 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
github.com/Microsoft/go-winio v0.4.17 // indirect
github.com/Microsoft/hcsshim v0.8.23 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
@@ -147,7 +149,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.25.0 // indirect
k8s.io/cli-runtime v0.25.0 // indirect
k8s.io/cli-runtime v0.25.4 // indirect
k8s.io/component-base v0.25.4 // indirect
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect
k8s.io/kube-proxy v0.0.0 // indirect
@@ -187,3 +189,5 @@ replace (
k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.25.4
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.25.4
)
replace github.com/JamesStewy/go-mysqldump => github.com/vtoma/go-mysqldump v1.0.0

4
go.sum
View File

@@ -67,6 +67,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
@@ -879,6 +881,8 @@ github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vb
github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/vtoma/go-mysqldump v1.0.0 h1:TNQXlsGD4r1+P9cMyzeWnOgHy0QbP0R+XfXqu5Is0fI=
github.com/vtoma/go-mysqldump v1.0.0/go.mod h1:i9PUM5mb4MH+4D4tJktCTLWDy6CX5rDCL/nS4+f3N5w=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=

View File

@@ -52,4 +52,5 @@ type Connection interface {
Close() error
Check(ctx context.Context) error
Driver() string
Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error
}

View File

@@ -173,3 +173,24 @@ func (e *EtcdClient) Driver() string {
func (e *EtcdClient) buildKey(key string) string {
return fmt.Sprintf("/%s/", key)
}
func (e *EtcdClient) Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error {
targetClient := target.(*EtcdClient) //nolint:forcetypeassert
if err := target.Check(ctx); err != nil {
return err
}
response, err := e.Client.Get(ctx, e.buildKey(fmt.Sprintf("%s_%s", tcp.GetNamespace(), tcp.GetName())), etcdclient.WithRange(rangeEnd))
if err != nil {
return err
}
for _, kv := range response.Kvs {
if _, err = targetClient.Client.Put(ctx, string(kv.Key), string(kv.Value)); err != nil {
return err
}
}
return nil
}

View File

@@ -8,7 +8,10 @@ import (
"database/sql"
"fmt"
"net/url"
"os"
"time"
"github.com/JamesStewy/go-mysqldump"
"github.com/go-sql-driver/mysql"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
@@ -22,7 +25,7 @@ const (
const (
mysqlFetchUserStatement = "SELECT User FROM mysql.user WHERE User= ? LIMIT 1"
mysqlFetchDBStatement = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME= ? LIMIT 1"
mysqlFetchDBStatement = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=? LIMIT 1"
mysqlShowGrantsStatement = "SHOW GRANTS FOR `%s`@`%%`"
mysqlCreateDBStatement = "CREATE DATABASE IF NOT EXISTS %s"
mysqlCreateUserStatement = "CREATE USER `%s`@`%%` IDENTIFIED BY '%s'"
@@ -37,6 +40,57 @@ type MySQLConnection struct {
connector ConnectionEndpoint
}
func (c *MySQLConnection) Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error {
// Ensuring the connection is working as expected
if err := target.Check(ctx); err != nil {
return err
}
// Creating the target schema if it doesn't exist
if ok, _ := target.DBExists(ctx, tcp.Status.Storage.Setup.Schema); !ok {
if err := target.CreateDB(ctx, tcp.Status.Storage.Setup.Schema); err != nil {
return err
}
}
// Dumping the old datastore in a local file
dir, err := os.MkdirTemp("", string(tcp.GetUID()))
if err != nil {
return fmt.Errorf("unable to create temp directory for MySQL migration: %w", err)
}
defer os.RemoveAll(dir)
if _, err = c.db.ExecContext(ctx, fmt.Sprintf("USE %s_%s", tcp.GetNamespace(), tcp.GetName())); err != nil {
return fmt.Errorf("unable to switch DB for MySQL migration: %w", err)
}
dumper, err := mysqldump.Register(c.db, dir, fmt.Sprintf("%d", time.Now().Unix()))
if err != nil {
return fmt.Errorf("unable to create MySQL dumper: %w", err)
}
defer dumper.Close()
dumpFile, err := dumper.Dump()
if err != nil {
return fmt.Errorf("unable to dump from MySQL: %w", err)
}
statements, err := os.ReadFile(dumpFile)
if err != nil {
return fmt.Errorf("cannot read dump file for MySQL: %w", err)
}
// Executing the import to the target datastore
targetClient := target.(*MySQLConnection) //nolint:forcetypeassert
if _, err = targetClient.db.ExecContext(ctx, fmt.Sprintf("USE %s_%s", tcp.GetNamespace(), tcp.GetName())); err != nil {
return fmt.Errorf("unable to switch DB for MySQL migration: %w", err)
}
if _, err = targetClient.db.ExecContext(ctx, string(statements)); err != nil {
return fmt.Errorf("cannot execute dump statements for MySQL: %w", err)
}
return nil
}
func (c *MySQLConnection) Driver() string {
return string(kamajiv1alpha1.KineMySQLDriver)
}

View File

@@ -4,6 +4,7 @@
package datastore
import (
"bytes"
"context"
"fmt"
"strings"
@@ -15,20 +16,82 @@ import (
)
const (
postgresqlFetchDBStatement = "SELECT FROM pg_database WHERE datname = ?"
postgresqlCreateDBStatement = "CREATE DATABASE %s"
postgresqlUserExists = "SELECT 1 FROM pg_roles WHERE rolname = ?"
postgresqlCreateUserStatement = "CREATE ROLE %s LOGIN PASSWORD ?"
postgresqlShowGrantsStatement = "SELECT has_database_privilege(rolname, ?, 'create') from pg_roles where rolcanlogin and rolname = ?"
postgresqlGrantPrivilegesStatement = "GRANT ALL PRIVILEGES ON DATABASE %s TO %s"
postgresqlRevokePrivilegesStatement = "REVOKE ALL PRIVILEGES ON DATABASE %s FROM %s"
postgresqlDropRoleStatement = "DROP ROLE %s"
postgresqlDropDBStatement = "DROP DATABASE %s WITH (FORCE)"
postgresqlFetchDBStatement = "SELECT FROM pg_database WHERE datname = ?"
postgresqlCreateDBStatement = "CREATE DATABASE %s"
postgresqlUserExists = "SELECT 1 FROM pg_roles WHERE rolname = ?"
postgresqlCreateUserStatement = "CREATE ROLE %s LOGIN PASSWORD ?"
postgresqlShowGrantsStatement = "SELECT has_database_privilege(rolname, ?, 'create') from pg_roles where rolcanlogin and rolname = ?"
postgresqlShowOwnershipStatement = "SELECT 't' FROM pg_catalog.pg_database AS d WHERE d.datname = ? AND pg_catalog.pg_get_userbyid(d.datdba) = ?"
postgresqlShowTableOwnershipStatement = "SELECT 't' from pg_tables where tableowner = ? AND tablename = ?"
postgresqlGrantPrivilegesStatement = "GRANT ALL PRIVILEGES ON DATABASE %s TO %s"
postgresqlChangeOwnerStatement = "ALTER DATABASE %s OWNER TO %s"
postgresqlRevokePrivilegesStatement = "REVOKE ALL PRIVILEGES ON DATABASE %s FROM %s"
postgresqlDropRoleStatement = "DROP ROLE %s"
postgresqlDropDBStatement = "DROP DATABASE %s WITH (FORCE)"
)
type PostgreSQLConnection struct {
db *pg.DB
connection ConnectionEndpoint
db *pg.DB
connection ConnectionEndpoint
switchDatabaseFn func(dbName string) *pg.DB
}
func (r *PostgreSQLConnection) Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error {
// Ensuring the connection is working as expected
if err := target.Check(ctx); err != nil {
return fmt.Errorf("unable to check target datastore: %w", err)
}
// Creating the target schema if it doesn't exist
if ok, _ := target.DBExists(ctx, tcp.Status.Storage.Setup.Schema); !ok {
if err := target.CreateDB(ctx, tcp.Status.Storage.Setup.Schema); err != nil {
return err
}
}
targetConn := target.(*PostgreSQLConnection).switchDatabaseFn(tcp.Status.Storage.Setup.Schema) //nolint:forcetypeassert
err := targetConn.RunInTransaction(ctx, func(tx *pg.Tx) error {
for _, stm := range []string{
`CREATE TABLE IF NOT EXISTS kine (
id SERIAL PRIMARY KEY,
name VARCHAR(630),
created INTEGER,
deleted INTEGER,
create_revision INTEGER,
prev_revision INTEGER,
lease INTEGER,
value bytea,
old_value bytea
)`,
`TRUNCATE TABLE kine`,
`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
} {
if _, err := tx.ExecContext(ctx, stm); err != nil {
return fmt.Errorf("unable to perform schema creation: %w", err)
}
}
// Dumping the old datastore in a local buffer
var buf bytes.Buffer
if _, err := r.switchDatabaseFn(tcp.Status.Storage.Setup.Schema).WithContext(ctx).CopyTo(&buf, "COPY kine TO STDOUT"); err != nil { //nolint:contextcheck
return fmt.Errorf("unable to copy from the origin datastore: %w", err)
}
if _, err := tx.CopyFrom(&buf, "COPY kine FROM STDIN"); err != nil {
return fmt.Errorf("unable to copy to the target datastore: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("unable to perform migration transaction: %w", err)
}
return nil
}
func NewPostgreSQLConnection(config ConnectionConfig) (Connection, error) {
@@ -40,7 +103,18 @@ func NewPostgreSQLConnection(config ConnectionConfig) (Connection, error) {
TLSConfig: config.TLSConfig,
}
return &PostgreSQLConnection{db: pg.Connect(opt), connection: config.Endpoints[0]}, nil
fn := func(dbName string) *pg.DB {
o := opt
o.Database = dbName
return pg.Connect(o)
}
return &PostgreSQLConnection{
db: pg.Connect(opt),
switchDatabaseFn: fn,
connection: config.Endpoints[0],
}, nil
}
func (r *PostgreSQLConnection) Driver() string {