From b027e23b9944ff96ed46f983a2698522acc4c05c Mon Sep 17 00:00:00 2001 From: Dario Tranchitella Date: Wed, 23 Apr 2025 21:00:29 +0200 Subject: [PATCH] feat: enhancing concurrent reconciliations (#790) * feat: buffered channels for generic events Channels used for GenericEvent feeding for cross controllers triggers are now buffered according to the --max-concurrent-tcp-reconciles: this is required to avoid channel full errors when dealing with large management clusters serving a sizeable amount of Tenant Control Planes. Increasing this value will put more pressure on memory (mostly for GC) and CPU (provisioning multiple certificates at the same time). Signed-off-by: Dario Tranchitella * refactor: retrying datastore status update Signed-off-by: Dario Tranchitella * feat(performance): reducing memory consumption for channel triggers Signed-off-by: Dario Tranchitella * feat(datastore): reconcile events only for root object changes Signed-off-by: Dario Tranchitella * feat: waiting soot manager exit before termination This change introduces a grace period of 10 seconds before abruptly terminating the Tenant Control Plane deployment, allowing the soot manager to complete its exit procedure and avoid false positive errors due to API Server being unresponsive due to user deletion. Aim of this change is reducing the amount of false positive errors upon mass deletion of Tenant COntrol Plane objects. Signed-off-by: Dario Tranchitella * refactor: unbuffered channel with timeout WatchesRawSource is non blocking, no need to check if channel is full. To prevent deadlocks a WithTimeout check has been introduced. Signed-off-by: Dario Tranchitella --------- Signed-off-by: Dario Tranchitella --- cmd/manager/cmd.go | 3 +- controllers/cert_channel.go | 10 --- .../certificate_lifecycle_controller.go | 2 +- controllers/datastore_controller.go | 75 ++++++++++--------- controllers/soot/manager.go | 68 ++++++++++++----- controllers/tcp_channel.go | 8 -- controllers/tenantcontrolplane_controller.go | 4 +- controllers/utils/trigger_channel.go | 26 +++++++ 8 files changed, 120 insertions(+), 76 deletions(-) delete mode 100644 controllers/cert_channel.go delete mode 100644 controllers/tcp_channel.go create mode 100644 controllers/utils/trigger_channel.go diff --git a/cmd/manager/cmd.go b/cmd/manager/cmd.go index 34abc3b..7f0c2d2 100644 --- a/cmd/manager/cmd.go +++ b/cmd/manager/cmd.go @@ -20,6 +20,7 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -136,7 +137,7 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { return err } - tcpChannel, certChannel := make(controllers.TenantControlPlaneChannel), make(controllers.CertificateChannel) + tcpChannel, certChannel := make(chan event.GenericEvent), make(chan event.GenericEvent) if err = (&controllers.DataStore{Client: mgr.GetClient(), TenantControlPlaneTrigger: tcpChannel}).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DataStore") diff --git a/controllers/cert_channel.go b/controllers/cert_channel.go deleted file mode 100644 index 8ca83e0..0000000 --- a/controllers/cert_channel.go +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright 2022 Clastix Labs -// SPDX-License-Identifier: Apache-2.0 - -package controllers - -import ( - "sigs.k8s.io/controller-runtime/pkg/event" -) - -type CertificateChannel chan event.GenericEvent diff --git a/controllers/certificate_lifecycle_controller.go b/controllers/certificate_lifecycle_controller.go index e9a3a68..ec3051a 100644 --- a/controllers/certificate_lifecycle_controller.go +++ b/controllers/certificate_lifecycle_controller.go @@ -30,7 +30,7 @@ import ( ) type CertificateLifecycle struct { - Channel CertificateChannel + Channel chan event.GenericEvent Deadline time.Duration client client.Client diff --git a/controllers/datastore_controller.go b/controllers/datastore_controller.go index 0748cee..8b046ee 100644 --- a/controllers/datastore_controller.go +++ b/controllers/datastore_controller.go @@ -5,13 +5,13 @@ package controllers import ( "context" - "fmt" "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -23,64 +23,71 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1" + "github.com/clastix/kamaji/controllers/utils" ) type DataStore struct { Client client.Client // TenantControlPlaneTrigger is the channel used to communicate across the controllers: - // if a Data Source is updated we have to be sure that the reconciliation of the certificates content + // if a Data Source is updated, we have to be sure that the reconciliation of the certificates content // for each Tenant Control Plane is put in place properly. - TenantControlPlaneTrigger TenantControlPlaneChannel + TenantControlPlaneTrigger chan event.GenericEvent } //+kubebuilder:rbac:groups=kamaji.clastix.io,resources=datastores,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=kamaji.clastix.io,resources=datastores/status,verbs=get;update;patch func (r *DataStore) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - log := log.FromContext(ctx) + logger := log.FromContext(ctx) - ds := &kamajiv1alpha1.DataStore{} - err := r.Client.Get(ctx, request.NamespacedName, ds) - if k8serrors.IsNotFound(err) { - log.Info("resource have been deleted, skipping") + var ds kamajiv1alpha1.DataStore + if err := r.Client.Get(ctx, request.NamespacedName, &ds); err != nil { + if k8serrors.IsNotFound(err) { + logger.Info("resource have been deleted, skipping") - return reconcile.Result{}, nil - } - if err != nil { - log.Error(err, "cannot retrieve the required resource") + return reconcile.Result{}, nil + } + + logger.Error(err, "cannot retrieve the required resource") return reconcile.Result{}, err } - tcpList := kamajiv1alpha1.TenantControlPlaneList{} + var tcpList kamajiv1alpha1.TenantControlPlaneList - if err := r.Client.List(ctx, &tcpList, client.MatchingFieldsSelector{ - Selector: fields.OneTermEqualSelector(kamajiv1alpha1.TenantControlPlaneUsedDataStoreKey, ds.GetName()), - }); err != nil { - log.Error(err, "cannot retrieve list of the Tenant Control Plane using the following instance") + updateErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if lErr := r.Client.List(ctx, &tcpList, client.MatchingFieldsSelector{ + Selector: fields.OneTermEqualSelector(kamajiv1alpha1.TenantControlPlaneUsedDataStoreKey, ds.GetName()), + }); lErr != nil { + return errors.Wrap(lErr, "cannot retrieve list of the Tenant Control Plane using the following instance") + } + // Updating the status with the list of Tenant Control Plane using the following Data Source + tcpSets := sets.NewString() + for _, tcp := range tcpList.Items { + tcpSets.Insert(getNamespacedName(tcp.GetNamespace(), tcp.GetName()).String()) + } - return reconcile.Result{}, err - } - // Updating the status with the list of Tenant Control Plane using the following Data Source - tcpSets := sets.NewString() - for _, tcp := range tcpList.Items { - tcpSets.Insert(getNamespacedName(tcp.GetNamespace(), tcp.GetName()).String()) - } + ds.Status.UsedBy = tcpSets.List() - ds.Status.UsedBy = tcpSets.List() + if sErr := r.Client.Status().Update(ctx, &ds); sErr != nil { + return errors.Wrap(sErr, "cannot update the status for the given instance") + } - if err := r.Client.Status().Update(ctx, ds); err != nil { - log.Error(err, "cannot update the status for the given instance") + return nil + }) + if updateErr != nil { + logger.Error(updateErr, "cannot update DataStore status") - return reconcile.Result{}, err + return reconcile.Result{}, updateErr } // Triggering the reconciliation of the Tenant Control Plane upon a Secret change for _, tcp := range tcpList.Items { - select { - case r.TenantControlPlaneTrigger <- event.GenericEvent{Object: &tcp}: - default: - log.Error(errors.New("channel is full"), fmt.Sprintf("can't push DataStore reconciliation for object %s/%s", tcp.Namespace, tcp.Name)) - } + var shrunkTCP kamajiv1alpha1.TenantControlPlane + + shrunkTCP.Name = tcp.Name + shrunkTCP.Namespace = tcp.Namespace + + go utils.TriggerChannel(ctx, r.TenantControlPlaneTrigger, shrunkTCP) } return reconcile.Result{}, nil @@ -99,7 +106,7 @@ func (r *DataStore) SetupWithManager(mgr controllerruntime.Manager) error { //nolint:forcetypeassert return controllerruntime.NewControllerManagedBy(mgr). For(&kamajiv1alpha1.DataStore{}, builder.WithPredicates( - predicate.ResourceVersionChangedPredicate{}, + predicate.GenerationChangedPredicate{}, )). Watches(&kamajiv1alpha1.TenantControlPlane{}, handler.Funcs{ CreateFunc: func(_ context.Context, createEvent event.TypedCreateEvent[client.Object], w workqueue.TypedRateLimitingInterface[reconcile.Request]) { diff --git a/controllers/soot/manager.go b/controllers/soot/manager.go index 6ee5f0c..5392aff 100644 --- a/controllers/soot/manager.go +++ b/controllers/soot/manager.go @@ -5,8 +5,8 @@ package soot import ( "context" - "errors" "fmt" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/rest" @@ -35,8 +35,9 @@ import ( ) type sootItem struct { - triggers map[string]chan event.GenericEvent - cancelFn context.CancelFunc + triggers []chan event.GenericEvent + cancelFn context.CancelFunc + completedCh chan struct{} } type sootMap map[string]sootItem @@ -98,6 +99,24 @@ func (m *Manager) cleanup(ctx context.Context, req reconcile.Request, tenantCont } v.cancelFn() + // TODO(prometherion): the 10 seconds is an hardcoded number, + // it's widely used across the code base as a timeout with the API Server. + // Evaluate if we would need to make this configurable globally. + deadlineCtx, deadlineFn := context.WithTimeout(ctx, 10*time.Second) + defer deadlineFn() + + select { + case _, open := <-v.completedCh: + if !open { + log.FromContext(ctx).Info("soot manager completed its process") + + break + } + case <-deadlineCtx.Done(): + log.FromContext(ctx).Error(deadlineCtx.Err(), "soot manager didn't exit to timeout") + + break + } delete(m.sootMap, tcpName) @@ -166,12 +185,13 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res // Once the TCP will be ready again, the event will be intercepted and the manager started back. return reconcile.Result{}, m.cleanup(ctx, request, tcp) default: - for name, trigger := range v.triggers { - select { - case trigger <- event.GenericEvent{Object: tcp}: - default: - log.FromContext(ctx).Error(errors.New("channel is full"), fmt.Sprintf("can't push trigger %s reconciliation for object %s/%s", name, tcp.Namespace, tcp.Name)) - } + for _, trigger := range v.triggers { + var shrunkTCP kamajiv1alpha1.TenantControlPlane + + shrunkTCP.Name = tcp.Namespace + shrunkTCP.Namespace = tcp.Namespace + + go utils.TriggerChannel(ctx, trigger, shrunkTCP) } } @@ -317,11 +337,12 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res if err = kubeadmRbac.SetupWithManager(mgr); err != nil { return reconcile.Result{}, err } + completedCh := make(chan struct{}) // Starting the manager go func() { if err = mgr.Start(tcpCtx); err != nil { log.FromContext(ctx).Error(err, "unable to start soot manager") - // The sootMAnagerAnnotation is used to propagate the error between reconciliations with its state: + // The sootManagerAnnotation is used to propagate the error between reconciliations with its state: // this is required to avoid mutex and prevent concurrent read/write on the soot map annotationErr := m.retryTenantControlPlaneAnnotations(ctx, request, func(annotations map[string]string) { annotations[sootManagerAnnotation] = sootManagerFailedAnnotation @@ -332,21 +353,28 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res // When the manager cannot start we're enqueuing back the request to take advantage of the backoff factor // of the queue: this is a goroutine and cannot return an error since the manager is running on its own, // using the sootManagerErrChan channel we can trigger a reconciliation although the TCP hadn't any change. - m.sootManagerErrChan <- event.GenericEvent{Object: tcp} + var shrunkTCP kamajiv1alpha1.TenantControlPlane + + shrunkTCP.Name = tcp.Name + shrunkTCP.Namespace = tcp.Namespace + + m.sootManagerErrChan <- event.GenericEvent{Object: &shrunkTCP} } + close(completedCh) }() m.sootMap[request.NamespacedName.String()] = sootItem{ - triggers: map[string]chan event.GenericEvent{ - "migrate": migrate.TriggerChannel, - "konnectivityAgent": konnectivityAgent.TriggerChannel, - "kubeProxy": kubeProxy.TriggerChannel, - "coreDNS": coreDNS.TriggerChannel, - "uploadKubeadmConfig": uploadKubeadmConfig.TriggerChannel, - "uploadKubeletConfig": uploadKubeletConfig.TriggerChannel, - "bootstrapToken": bootstrapToken.TriggerChannel, + triggers: []chan event.GenericEvent{ + migrate.TriggerChannel, + konnectivityAgent.TriggerChannel, + kubeProxy.TriggerChannel, + coreDNS.TriggerChannel, + uploadKubeadmConfig.TriggerChannel, + uploadKubeletConfig.TriggerChannel, + bootstrapToken.TriggerChannel, }, - cancelFn: tcpCancelFn, + cancelFn: tcpCancelFn, + completedCh: completedCh, } return reconcile.Result{Requeue: true}, nil diff --git a/controllers/tcp_channel.go b/controllers/tcp_channel.go deleted file mode 100644 index a6c4494..0000000 --- a/controllers/tcp_channel.go +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright 2022 Clastix Labs -// SPDX-License-Identifier: Apache-2.0 - -package controllers - -import "sigs.k8s.io/controller-runtime/pkg/event" - -type TenantControlPlaneChannel chan event.GenericEvent diff --git a/controllers/tenantcontrolplane_controller.go b/controllers/tenantcontrolplane_controller.go index 92ddf0f..52916a7 100644 --- a/controllers/tenantcontrolplane_controller.go +++ b/controllers/tenantcontrolplane_controller.go @@ -44,7 +44,7 @@ type TenantControlPlaneReconciler struct { Client client.Client APIReader client.Reader Config TenantControlPlaneReconcilerConfig - TriggerChan TenantControlPlaneChannel + TriggerChan chan event.GenericEvent KamajiNamespace string KamajiServiceAccount string KamajiService string @@ -53,7 +53,7 @@ type TenantControlPlaneReconciler struct { // CertificateChan is the channel used by the CertificateLifecycleController that is checking for // certificates and kubeconfig user certs validity: a generic event for the given TCP will be triggered // once the validity threshold for the given certificate is reached. - CertificateChan CertificateChannel + CertificateChan chan event.GenericEvent clock mutex.Clock } diff --git a/controllers/utils/trigger_channel.go b/controllers/utils/trigger_channel.go new file mode 100644 index 0000000..232b785 --- /dev/null +++ b/controllers/utils/trigger_channel.go @@ -0,0 +1,26 @@ +// Copyright 2022 Clastix Labs +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "context" + "time" + + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + + kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1" +) + +func TriggerChannel(ctx context.Context, receiver chan event.GenericEvent, tcp kamajiv1alpha1.TenantControlPlane) { + deadlineCtx, cancelFn := context.WithTimeout(ctx, 10*time.Second) + defer cancelFn() + + select { + case receiver <- event.GenericEvent{Object: &tcp}: + return + case <-deadlineCtx.Done(): + log.FromContext(ctx).Error(deadlineCtx.Err(), "cannot send due to timeout") + } +}