mirror of
https://github.com/outbackdingo/kamaji.git
synced 2026-01-27 10:19:29 +00:00
* feat: add support for multiple Datastores * docs: add guide for datastore overrides * feat(datastore): add e2e test for dataStoreOverrides * ci: reclaim disk space from runner to fix flaky tests
405 lines
15 KiB
Go
405 lines
15 KiB
Go
// Copyright 2022 Clastix Labs
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package controllers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/juju/mutex/v2"
|
|
"github.com/pkg/errors"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
networkingv1 "k8s.io/api/networking/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
k8stypes "k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"k8s.io/utils/clock"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/builder"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
"sigs.k8s.io/controller-runtime/pkg/source"
|
|
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
|
|
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
|
|
|
|
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
|
|
"github.com/clastix/kamaji/controllers/finalizers"
|
|
"github.com/clastix/kamaji/controllers/utils"
|
|
controlplanebuilder "github.com/clastix/kamaji/internal/builders/controlplane"
|
|
"github.com/clastix/kamaji/internal/datastore"
|
|
kamajierrors "github.com/clastix/kamaji/internal/errors"
|
|
"github.com/clastix/kamaji/internal/resources"
|
|
"github.com/clastix/kamaji/internal/utilities"
|
|
)
|
|
|
|
// TenantControlPlaneReconciler reconciles a TenantControlPlane object.
|
|
type TenantControlPlaneReconciler struct {
|
|
Client client.Client
|
|
APIReader client.Reader
|
|
Config TenantControlPlaneReconcilerConfig
|
|
TriggerChan chan event.GenericEvent
|
|
KamajiNamespace string
|
|
KamajiServiceAccount string
|
|
KamajiService string
|
|
KamajiMigrateImage string
|
|
MaxConcurrentReconciles int
|
|
ReconcileTimeout time.Duration
|
|
DiscoveryClient discovery.DiscoveryInterface
|
|
// CertificateChan is the channel used by the CertificateLifecycleController that is checking for
|
|
// certificates and kubeconfig user certs validity: a generic event for the given TCP will be triggered
|
|
// once the validity threshold for the given certificate is reached.
|
|
CertificateChan chan event.GenericEvent
|
|
|
|
clock mutex.Clock
|
|
}
|
|
|
|
// TenantControlPlaneReconcilerConfig gives the necessary configuration for TenantControlPlaneReconciler.
|
|
type TenantControlPlaneReconcilerConfig struct {
|
|
DefaultDataStoreName string
|
|
KineContainerImage string
|
|
TmpBaseDirectory string
|
|
CertExpirationThreshold time.Duration
|
|
}
|
|
|
|
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=tenantcontrolplanes,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=tenantcontrolplanes/status,verbs=get;update;patch
|
|
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=tenantcontrolplanes/finalizers,verbs=update
|
|
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;delete
|
|
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=grpcroutes,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tlsroutes,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch
|
|
|
|
func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
log := log.FromContext(ctx)
|
|
|
|
var cancelFn context.CancelFunc
|
|
ctx, cancelFn = context.WithTimeout(ctx, r.ReconcileTimeout)
|
|
defer cancelFn()
|
|
|
|
tenantControlPlane, err := r.getTenantControlPlane(ctx, req.NamespacedName)()
|
|
if k8serrors.IsNotFound(err) {
|
|
log.Info("resource may have been deleted, skipping")
|
|
|
|
return reconcile.Result{}, nil
|
|
}
|
|
if err != nil {
|
|
log.Error(err, "cannot retrieve the required resource")
|
|
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
if utils.IsPaused(tenantControlPlane) {
|
|
log.Info("paused reconciliation, no further actions")
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
releaser, err := mutex.Acquire(r.mutexSpec(tenantControlPlane))
|
|
if err != nil {
|
|
switch {
|
|
case errors.As(err, &mutex.ErrTimeout):
|
|
log.Info("acquire timed out, current process is blocked by another reconciliation")
|
|
|
|
return ctrl.Result{RequeueAfter: time.Second}, nil
|
|
case errors.As(err, &mutex.ErrCancelled):
|
|
log.Info("acquire cancelled")
|
|
|
|
return ctrl.Result{RequeueAfter: time.Second}, nil
|
|
default:
|
|
log.Error(err, "acquire failed")
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
}
|
|
defer releaser.Release()
|
|
|
|
markedToBeDeleted := tenantControlPlane.GetDeletionTimestamp() != nil
|
|
|
|
if markedToBeDeleted && !controllerutil.ContainsFinalizer(tenantControlPlane, finalizers.DatastoreFinalizer) {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
// Retrieving the DataStore to use for the current reconciliation
|
|
ds, err := r.dataStore(ctx, tenantControlPlane)
|
|
if err != nil {
|
|
if errors.Is(err, ErrMissingDataStore) {
|
|
log.Info(err.Error())
|
|
|
|
return ctrl.Result{RequeueAfter: time.Second}, nil
|
|
}
|
|
|
|
log.Error(err, "cannot retrieve the DataStore for the given instance")
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
dsConnection, err := datastore.NewStorageConnection(ctx, r.Client, *ds)
|
|
if err != nil {
|
|
log.Error(err, "cannot generate the DataStore connection for the given instance")
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
defer dsConnection.Close()
|
|
|
|
dso, err := r.dataStoreOverride(ctx, tenantControlPlane)
|
|
if err != nil {
|
|
log.Error(err, "cannot retrieve the DataStoreOverrides for the given instance")
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
dsoConnections := make(map[string]datastore.Connection, len(dso))
|
|
for _, ds := range dso {
|
|
dsoConnection, err := datastore.NewStorageConnection(ctx, r.Client, ds.DataStore)
|
|
if err != nil {
|
|
log.Error(err, "cannot generate the DataStoreOverride connection for the given instance")
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
defer dsoConnection.Close()
|
|
|
|
dsoConnections[ds.Resource] = dsoConnection
|
|
}
|
|
|
|
if markedToBeDeleted && controllerutil.ContainsFinalizer(tenantControlPlane, finalizers.DatastoreFinalizer) {
|
|
log.Info("marked for deletion, performing clean-up")
|
|
|
|
groupDeletableResourceBuilderConfiguration := GroupDeletableResourceBuilderConfiguration{
|
|
client: r.Client,
|
|
log: log,
|
|
tcpReconcilerConfig: r.Config,
|
|
tenantControlPlane: *tenantControlPlane,
|
|
connection: dsConnection,
|
|
dataStore: *ds,
|
|
}
|
|
|
|
for _, resource := range GetDeletableResources(tenantControlPlane, groupDeletableResourceBuilderConfiguration) {
|
|
if err = resources.HandleDeletion(ctx, resource, tenantControlPlane); err != nil {
|
|
log.Error(err, "resource deletion failed", "resource", resource.GetName())
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
}
|
|
|
|
log.Info("resource deletions have been completed")
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
groupResourceBuilderConfiguration := GroupResourceBuilderConfiguration{
|
|
client: r.Client,
|
|
log: log,
|
|
tcpReconcilerConfig: r.Config,
|
|
tenantControlPlane: *tenantControlPlane,
|
|
Connection: dsConnection,
|
|
DataStore: *ds,
|
|
DataStoreOverrides: dso,
|
|
DataStoreOverriedsConnections: dsoConnections,
|
|
KamajiNamespace: r.KamajiNamespace,
|
|
KamajiServiceAccount: r.KamajiServiceAccount,
|
|
KamajiService: r.KamajiService,
|
|
KamajiMigrateImage: r.KamajiMigrateImage,
|
|
DiscoveryClient: r.DiscoveryClient,
|
|
}
|
|
registeredResources := GetResources(ctx, groupResourceBuilderConfiguration)
|
|
|
|
for _, resource := range registeredResources {
|
|
result, err := resources.Handle(ctx, resource, tenantControlPlane)
|
|
if err != nil {
|
|
if kamajierrors.ShouldReconcileErrorBeIgnored(err) {
|
|
log.V(1).Info("sentinel error, enqueuing back request", "error", err.Error())
|
|
|
|
return ctrl.Result{RequeueAfter: time.Second}, nil
|
|
}
|
|
|
|
log.Error(err, "handling of resource failed", "resource", resource.GetName())
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
if result == controllerutil.OperationResultNone {
|
|
continue
|
|
}
|
|
|
|
if err = utils.UpdateStatus(ctx, r.Client, tenantControlPlane, resource); err != nil {
|
|
if kamajierrors.ShouldReconcileErrorBeIgnored(err) {
|
|
log.V(1).Info("sentinel error, enqueuing back request", "error", err.Error())
|
|
|
|
return ctrl.Result{RequeueAfter: time.Second}, nil
|
|
}
|
|
|
|
log.Error(err, "update of the resource failed", "resource", resource.GetName())
|
|
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
log.Info(fmt.Sprintf("%s has been configured", resource.GetName()))
|
|
|
|
if result == resources.OperationResultEnqueueBack {
|
|
log.Info("requested enqueuing back", "resources", resource.GetName())
|
|
|
|
return ctrl.Result{RequeueAfter: time.Second}, nil
|
|
}
|
|
}
|
|
|
|
log.Info(fmt.Sprintf("%s has been reconciled", tenantControlPlane.GetName()))
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *TenantControlPlaneReconciler) mutexSpec(obj client.Object) mutex.Spec {
|
|
return mutex.Spec{
|
|
Name: strings.ReplaceAll(fmt.Sprintf("kamaji%s", obj.GetUID()), "-", ""),
|
|
Clock: r.clock,
|
|
Delay: 10 * time.Millisecond,
|
|
Timeout: time.Second,
|
|
Cancel: nil,
|
|
}
|
|
}
|
|
|
|
// SetupWithManager sets up the controller with the Manager.
|
|
func (r *TenantControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
|
|
r.clock = clock.RealClock{}
|
|
|
|
controllerBuilder := ctrl.NewControllerManagedBy(mgr).
|
|
WatchesRawSource(source.Channel(r.CertificateChan, handler.Funcs{GenericFunc: func(_ context.Context, genericEvent event.TypedGenericEvent[client.Object], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
w.AddRateLimited(ctrl.Request{
|
|
NamespacedName: k8stypes.NamespacedName{
|
|
Namespace: genericEvent.Object.GetNamespace(),
|
|
Name: genericEvent.Object.GetName(),
|
|
},
|
|
})
|
|
}})).
|
|
WatchesRawSource(source.Channel(r.TriggerChan, handler.Funcs{GenericFunc: func(_ context.Context, genericEvent event.TypedGenericEvent[client.Object], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
|
w.AddRateLimited(ctrl.Request{
|
|
NamespacedName: k8stypes.NamespacedName{
|
|
Namespace: genericEvent.Object.GetNamespace(),
|
|
Name: genericEvent.Object.GetName(),
|
|
},
|
|
})
|
|
}})).
|
|
For(&kamajiv1alpha1.TenantControlPlane{}).
|
|
Owns(&corev1.Secret{}).
|
|
Owns(&corev1.ConfigMap{}).
|
|
Owns(&appsv1.Deployment{}).
|
|
Owns(&corev1.Service{}).
|
|
Owns(&networkingv1.Ingress{}).
|
|
Watches(&batchv1.Job{}, handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request {
|
|
labels := object.GetLabels()
|
|
|
|
name, namespace := labels["tcp.kamaji.clastix.io/name"], labels["tcp.kamaji.clastix.io/namespace"]
|
|
|
|
return []reconcile.Request{
|
|
{
|
|
NamespacedName: k8stypes.NamespacedName{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
},
|
|
}
|
|
}), builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool {
|
|
if object.GetNamespace() != r.KamajiNamespace {
|
|
return false
|
|
}
|
|
|
|
labels := object.GetLabels()
|
|
|
|
if labels == nil {
|
|
return false
|
|
}
|
|
|
|
v, ok := labels["kamaji.clastix.io/component"]
|
|
|
|
return ok && v == "migrate"
|
|
})))
|
|
|
|
// Conditionally add Gateway API ownership if available
|
|
if utilities.AreGatewayResourcesAvailable(ctx, r.Client, r.DiscoveryClient) {
|
|
controllerBuilder = controllerBuilder.
|
|
Owns(&gatewayv1.HTTPRoute{}).
|
|
Owns(&gatewayv1.GRPCRoute{}).
|
|
Owns(&gatewayv1alpha2.TLSRoute{}).
|
|
Watches(&gatewayv1.Gateway{}, handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request {
|
|
return nil
|
|
}))
|
|
}
|
|
|
|
return controllerBuilder.
|
|
WithOptions(controller.Options{
|
|
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
|
|
}).
|
|
Complete(r)
|
|
}
|
|
|
|
func (r *TenantControlPlaneReconciler) getTenantControlPlane(ctx context.Context, namespacedName k8stypes.NamespacedName) utils.TenantControlPlaneRetrievalFn {
|
|
return func() (*kamajiv1alpha1.TenantControlPlane, error) {
|
|
tcp := &kamajiv1alpha1.TenantControlPlane{}
|
|
if err := r.APIReader.Get(ctx, namespacedName, tcp); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return tcp, nil
|
|
}
|
|
}
|
|
|
|
func (r *TenantControlPlaneReconciler) RemoveFinalizer(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
|
|
controllerutil.RemoveFinalizer(tenantControlPlane, finalizers.DatastoreFinalizer)
|
|
|
|
return r.Client.Update(ctx, tenantControlPlane)
|
|
}
|
|
|
|
var ErrMissingDataStore = errors.New("the Tenant Control Plane doesn't have a DataStore assigned, and Kamaji is running with no default DataStore fallback")
|
|
|
|
// dataStore retrieves the override DataStore for the given Tenant Control Plane if specified,
|
|
// otherwise fallback to the default one specified in the Kamaji setup.
|
|
func (r *TenantControlPlaneReconciler) dataStore(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (*kamajiv1alpha1.DataStore, error) {
|
|
if tenantControlPlane.Spec.DataStore == "" && r.Config.DefaultDataStoreName == "" {
|
|
return nil, ErrMissingDataStore
|
|
}
|
|
|
|
if tenantControlPlane.Spec.DataStore == "" {
|
|
tenantControlPlane.Spec.DataStore = r.Config.DefaultDataStoreName
|
|
}
|
|
|
|
var ds kamajiv1alpha1.DataStore
|
|
if err := r.Client.Get(ctx, k8stypes.NamespacedName{Name: tenantControlPlane.Spec.DataStore}, &ds); err != nil {
|
|
return nil, errors.Wrap(err, "cannot retrieve *kamajiv1alpha.DataStore object")
|
|
}
|
|
|
|
return &ds, nil
|
|
}
|
|
|
|
func (r *TenantControlPlaneReconciler) dataStoreOverride(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) ([]controlplanebuilder.DataStoreOverrides, error) {
|
|
datastores := make([]controlplanebuilder.DataStoreOverrides, 0, len(tenantControlPlane.Spec.DataStoreOverrides))
|
|
|
|
for _, dso := range tenantControlPlane.Spec.DataStoreOverrides {
|
|
var ds kamajiv1alpha1.DataStore
|
|
if err := r.Client.Get(ctx, k8stypes.NamespacedName{Name: dso.DataStore}, &ds); err != nil {
|
|
return nil, errors.Wrap(err, "cannot retrieve *kamajiv1alpha.DataStore object")
|
|
}
|
|
if ds.Spec.Driver != kamajiv1alpha1.EtcdDriver {
|
|
return nil, errors.New("DataStoreOverrides can only use ETCD driver")
|
|
}
|
|
|
|
datastores = append(datastores, controlplanebuilder.DataStoreOverrides{Resource: dso.Resource, DataStore: ds})
|
|
}
|
|
|
|
return datastores, nil
|
|
}
|