feat: support for tcp specific data store

This commit is contained in:
Dario Tranchitella
2022-08-29 14:38:14 +02:00
parent 7602d5d803
commit d59f494a69
9 changed files with 178 additions and 45 deletions

View File

@@ -75,10 +75,11 @@ type DataStoreSetupStatus struct {
// StorageStatus defines the observed state of StorageStatus.
type StorageStatus struct {
Driver string `json:"driver,omitempty"`
Config DataStoreConfigStatus `json:"config,omitempty"`
Setup DataStoreSetupStatus `json:"setup,omitempty"`
Certificate DataStoreCertificateStatus `json:"certificate,omitempty"`
Driver string `json:"driver,omitempty"`
DataStoreName string `json:"dataStoreName,omitempty"`
Config DataStoreConfigStatus `json:"config,omitempty"`
Setup DataStoreSetupStatus `json:"setup,omitempty"`
Certificate DataStoreCertificateStatus `json:"certificate,omitempty"`
}
// KubeconfigStatus contains information about the generated kubeconfig.

View File

@@ -144,6 +144,10 @@ type AddonsSpec struct {
// TenantControlPlaneSpec defines the desired state of TenantControlPlane.
type TenantControlPlaneSpec struct {
// DataStore allows to specify a DataStore that should be used to store the Kubernetes data for the given Tenant Control Plane.
// This parameter is optional and acts as an override over the default one which is used by the Kamaji Operator.
// Migration from a different DataStore to another one is not yet supported and the reconciliation will be blocked.
DataStore string `json:"dataStore,omitempty"`
ControlPlane ControlPlane `json:"controlPlane"`
// Kubernetes specification for tenant control plane
Kubernetes KubernetesSpec `json:"kubernetes"`

View File

@@ -8,15 +8,27 @@ import (
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/indexers"
)
const (
dataStoreFinalizer = "finalizer.kamaji.clastix.io/datastore"
)
type DataStore struct {
@@ -25,22 +37,43 @@ type DataStore struct {
// if a Data Source is updated we have to be sure that the reconciliation of the certificates content
// for each Tenant Control Plane is put in place properly.
TenantControlPlaneTrigger TenantControlPlaneChannel
// ResourceName is the DataStore object that should be watched for changes.
ResourceName string
}
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=datastores,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=datastores/status,verbs=get;update;patch
func (r *DataStore) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
ds := kamajiv1alpha1.DataStore{}
if err := r.client.Get(ctx, request.NamespacedName, &ds); err != nil {
log := log.FromContext(ctx)
ds := &kamajiv1alpha1.DataStore{}
if err := r.client.Get(ctx, request.NamespacedName, ds); err != nil {
if k8serrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// Managing the finalizer, required to don't drop a DataSource if this is still used by a Tenant Control Plane.
switch {
case ds.DeletionTimestamp != nil && controllerutil.ContainsFinalizer(ds, dataStoreFinalizer):
log.Info("marked for deletion, checking conditions")
if len(ds.Status.UsedBy) == 0 {
log.Info("resource is no more used by any Tenant Control Plane")
controllerutil.RemoveFinalizer(ds, dataStoreFinalizer)
return reconcile.Result{}, r.client.Update(ctx, ds)
}
log.Info("DataStore is still used by some Tenant Control Planes, cannot be removed")
case ds.DeletionTimestamp == nil && !controllerutil.ContainsFinalizer(ds, dataStoreFinalizer):
log.Info("the resource is missing the required finalizer, adding it")
controllerutil.AddFinalizer(ds, dataStoreFinalizer)
return reconcile.Result{}, r.client.Update(ctx, ds)
}
// A Data Source can trigger several Tenant Control Planes and requires a minimum validation:
// we have to ensure the data provided by the Data Source is valid and referencing an existing Secret object.
if _, err := ds.Spec.TLSConfig.CertificateAuthority.Certificate.GetContent(ctx, r.client); err != nil {
@@ -57,7 +90,9 @@ func (r *DataStore) Reconcile(ctx context.Context, request reconcile.Request) (r
tcpList := kamajiv1alpha1.TenantControlPlaneList{}
if err := r.client.List(ctx, &tcpList); err != nil {
if err := r.client.List(ctx, &tcpList, client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector(indexers.TenantControlPlaneUsedDataStoreKey, ds.GetName()),
}); err != nil {
return reconcile.Result{}, err
}
// Updating the status with the list of Tenant Control Plane using the following Data Source
@@ -68,7 +103,7 @@ func (r *DataStore) Reconcile(ctx context.Context, request reconcile.Request) (r
ds.Status.UsedBy = tcpSets.List()
if err := r.client.Status().Update(ctx, &ds); err != nil {
if err := r.client.Status().Update(ctx, ds); err != nil {
return reconcile.Result{}, err
}
// Triggering the reconciliation of the Tenant Control Plane upon a Secret change
@@ -88,9 +123,31 @@ func (r *DataStore) InjectClient(client client.Client) error {
}
func (r *DataStore) SetupWithManager(mgr controllerruntime.Manager) error {
enqueueFn := func(tcp *kamajiv1alpha1.TenantControlPlane, limitingInterface workqueue.RateLimitingInterface) {
if dataStoreName := tcp.Status.Storage.DataStoreName; len(dataStoreName) > 0 {
limitingInterface.AddRateLimited(reconcile.Request{
NamespacedName: k8stypes.NamespacedName{
Name: dataStoreName,
},
})
}
}
//nolint:forcetypeassert
return controllerruntime.NewControllerManagedBy(mgr).
For(&kamajiv1alpha1.DataStore{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool {
return object.GetName() == r.ResourceName
}))).
For(&kamajiv1alpha1.DataStore{}, builder.WithPredicates(
predicate.ResourceVersionChangedPredicate{},
)).
Watches(&source.Kind{Type: &kamajiv1alpha1.TenantControlPlane{}}, handler.Funcs{
CreateFunc: func(createEvent event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
enqueueFn(createEvent.Object.(*kamajiv1alpha1.TenantControlPlane), limitingInterface)
},
UpdateFunc: func(updateEvent event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
enqueueFn(updateEvent.ObjectOld.(*kamajiv1alpha1.TenantControlPlane), limitingInterface)
enqueueFn(updateEvent.ObjectNew.(*kamajiv1alpha1.TenantControlPlane), limitingInterface)
},
DeleteFunc: func(deleteEvent event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
enqueueFn(deleteEvent.Object.(*kamajiv1alpha1.TenantControlPlane), limitingInterface)
},
}).
Complete(r)
}

View File

@@ -175,7 +175,7 @@ func getKubernetesStorageResources(c client.Client, dbConnection datastore.Conne
&ds.Config{
Client: c,
ConnString: dbConnection.GetConnectionString(),
Driver: dbConnection.Driver(),
DataStore: datastore,
},
&ds.Setup{
Client: c,

View File

@@ -29,7 +29,7 @@ import (
)
const (
finalizer = "finalizer.kamaji.clastix.io"
tenantControlPlaneFinalizer = "finalizer.kamaji.clastix.io"
)
// TenantControlPlaneReconciler reconciles a TenantControlPlane object.
@@ -42,9 +42,9 @@ type TenantControlPlaneReconciler struct {
// TenantControlPlaneReconcilerConfig gives the necessary configuration for TenantControlPlaneReconciler.
type TenantControlPlaneReconcilerConfig struct {
DataStoreName string
KineContainerImage string
TmpBaseDirectory string
DefaultDataStoreName string
KineContainerImage string
TmpBaseDirectory string
}
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=tenantcontrolplanes,verbs=get;list;watch;create;update;patch;delete
@@ -69,18 +69,18 @@ func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.R
}
markedToBeDeleted := tenantControlPlane.GetDeletionTimestamp() != nil
hasFinalizer := hasFinalizer(*tenantControlPlane)
hasFinalizer := controllerutil.ContainsFinalizer(tenantControlPlane, tenantControlPlaneFinalizer)
if markedToBeDeleted && !hasFinalizer {
return ctrl.Result{}, nil
}
ds := kamajiv1alpha1.DataStore{}
if err = r.Client.Get(ctx, k8stypes.NamespacedName{Name: r.Config.DataStoreName}, &ds); err != nil {
return ctrl.Result{}, errors.Wrap(err, "cannot retrieve kamajiv1alpha.DataStore object")
// Retrieving the DataStore to use for the current reconciliation
ds, err := r.dataStore(ctx, tenantControlPlane)
if err != nil {
return ctrl.Result{}, err
}
dsConnection, err := r.getStorageConnection(ctx, ds)
dsConnection, err := r.getStorageConnection(ctx, *ds)
if err != nil {
return ctrl.Result{}, err
}
@@ -126,7 +126,7 @@ func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.R
log: log,
tcpReconcilerConfig: r.Config,
tenantControlPlane: *tenantControlPlane,
DataStore: ds,
DataStore: *ds,
Connection: dsConnection,
}
registeredResources := GetResources(groupResourceBuilderConfiguration)
@@ -215,24 +215,34 @@ func (r *TenantControlPlaneReconciler) updateStatus(ctx context.Context, namespa
return nil
}
func hasFinalizer(tenantControlPlane kamajiv1alpha1.TenantControlPlane) bool {
for _, f := range tenantControlPlane.GetFinalizers() {
if f == finalizer {
return true
}
}
return false
}
func (r *TenantControlPlaneReconciler) AddFinalizer(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
controllerutil.AddFinalizer(tenantControlPlane, finalizer)
controllerutil.AddFinalizer(tenantControlPlane, tenantControlPlaneFinalizer)
return r.Update(ctx, tenantControlPlane)
}
func (r *TenantControlPlaneReconciler) RemoveFinalizer(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
controllerutil.RemoveFinalizer(tenantControlPlane, finalizer)
controllerutil.RemoveFinalizer(tenantControlPlane, tenantControlPlaneFinalizer)
return r.Update(ctx, tenantControlPlane)
}
// dataStore retrieves the override DataStore for the given Tenant Control Plane if specified,
// otherwise fallback to the default one specified in the Kamaji setup.
func (r *TenantControlPlaneReconciler) dataStore(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (*kamajiv1alpha1.DataStore, error) {
dataStoreName := tenantControlPlane.Spec.DataStore
if len(dataStoreName) == 0 {
dataStoreName = r.Config.DefaultDataStoreName
}
if statusDataStore := tenantControlPlane.Status.Storage.DataStoreName; len(statusDataStore) > 0 && dataStoreName != statusDataStore {
return nil, fmt.Errorf("migration from a DataStore (current: %s) to another one (desired: %s) is not yet supported", statusDataStore, dataStoreName)
}
ds := &kamajiv1alpha1.DataStore{}
if err := r.Client.Get(ctx, k8stypes.NamespacedName{Name: dataStoreName}, ds); err != nil {
return nil, errors.Wrap(err, "cannot retrieve *kamajiv1alpha.DataStore object")
}
return ds, nil
}

12
indexers/indexer.go Normal file
View File

@@ -0,0 +1,12 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package indexers
import "sigs.k8s.io/controller-runtime/pkg/client"
type Indexer interface {
Object() client.Object
Field() string
ExtractValue() client.IndexerFunc
}

View File

@@ -0,0 +1,40 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package indexers
import (
"context"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
)
const (
TenantControlPlaneUsedDataStoreKey = "status.storage.dataStoreName"
)
type TenantControlPlaneStatusDataStore struct{}
func (t *TenantControlPlaneStatusDataStore) Object() client.Object {
return &kamajiv1alpha1.TenantControlPlane{}
}
func (t *TenantControlPlaneStatusDataStore) Field() string {
return TenantControlPlaneUsedDataStoreKey
}
func (t *TenantControlPlaneStatusDataStore) ExtractValue() client.IndexerFunc {
return func(object client.Object) []string {
//nolint:forcetypeassert
tcp := object.(*kamajiv1alpha1.TenantControlPlane)
return []string{tcp.Status.Storage.DataStoreName}
}
}
func (t *TenantControlPlaneStatusDataStore) SetupWithManager(ctx context.Context, mgr controllerruntime.Manager) error {
return mgr.GetFieldIndexer().IndexField(ctx, t.Object(), t.Field(), t.ExtractValue())
}

View File

@@ -20,12 +20,12 @@ type Config struct {
resource *corev1.Secret
Client client.Client
ConnString string
Driver string
DataStore kamajiv1alpha1.DataStore
}
func (r *Config) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
return tenantControlPlane.Status.Storage.Config.Checksum != r.resource.GetAnnotations()["checksum"] ||
tenantControlPlane.Status.Storage.Driver != r.Driver
tenantControlPlane.Status.Storage.DataStoreName != r.DataStore.GetName()
}
func (r *Config) ShouldCleanup(*kamajiv1alpha1.TenantControlPlane) bool {
@@ -64,7 +64,8 @@ func (r *Config) GetName() string {
}
func (r *Config) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
tenantControlPlane.Status.Storage.Driver = r.Driver
tenantControlPlane.Status.Storage.Driver = string(r.DataStore.Spec.Driver)
tenantControlPlane.Status.Storage.DataStoreName = r.DataStore.GetName()
tenantControlPlane.Status.Storage.Config.SecretName = r.resource.GetName()
tenantControlPlane.Status.Storage.Config.Checksum = r.resource.GetAnnotations()["checksum"]

18
main.go
View File

@@ -18,6 +18,7 @@ import (
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/controllers"
"github.com/clastix/kamaji/indexers"
"github.com/clastix/kamaji/internal"
"github.com/clastix/kamaji/internal/config"
)
@@ -35,6 +36,8 @@ func init() {
}
func main() {
ctx := ctrl.SetupSignalHandler()
conf, err := config.InitConfig()
if err != nil {
log.Fatalf("Error reading configuration.")
@@ -61,7 +64,7 @@ func main() {
tcpChannel := make(controllers.TenantControlPlaneChannel)
if err = (&controllers.DataStore{TenantControlPlaneTrigger: tcpChannel, ResourceName: conf.GetString("datastore")}).SetupWithManager(mgr); err != nil {
if err = (&controllers.DataStore{TenantControlPlaneTrigger: tcpChannel}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DataStore")
os.Exit(1)
}
@@ -70,9 +73,9 @@ func main() {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Config: controllers.TenantControlPlaneReconcilerConfig{
DataStoreName: conf.GetString("datastore"),
KineContainerImage: conf.GetString("kine-image"),
TmpBaseDirectory: conf.GetString("tmp-directory"),
DefaultDataStoreName: conf.GetString("datastore"),
KineContainerImage: conf.GetString("kine-image"),
TmpBaseDirectory: conf.GetString("tmp-directory"),
},
TriggerChan: tcpChannel,
}
@@ -82,6 +85,11 @@ func main() {
os.Exit(1)
}
if err = (&indexers.TenantControlPlaneStatusDataStore{}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create indexer", "indexer", "TenantControlPlaneStatusDataStore")
os.Exit(1)
}
//+kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
@@ -94,7 +102,7 @@ func main() {
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}