diff --git a/cmd/manager/cmd.go b/cmd/manager/cmd.go index 34abc3b..7f0c2d2 100644 --- a/cmd/manager/cmd.go +++ b/cmd/manager/cmd.go @@ -20,6 +20,7 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -136,7 +137,7 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { return err } - tcpChannel, certChannel := make(controllers.TenantControlPlaneChannel), make(controllers.CertificateChannel) + tcpChannel, certChannel := make(chan event.GenericEvent), make(chan event.GenericEvent) if err = (&controllers.DataStore{Client: mgr.GetClient(), TenantControlPlaneTrigger: tcpChannel}).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DataStore") diff --git a/controllers/cert_channel.go b/controllers/cert_channel.go deleted file mode 100644 index 8ca83e0..0000000 --- a/controllers/cert_channel.go +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright 2022 Clastix Labs -// SPDX-License-Identifier: Apache-2.0 - -package controllers - -import ( - "sigs.k8s.io/controller-runtime/pkg/event" -) - -type CertificateChannel chan event.GenericEvent diff --git a/controllers/certificate_lifecycle_controller.go b/controllers/certificate_lifecycle_controller.go index e9a3a68..ec3051a 100644 --- a/controllers/certificate_lifecycle_controller.go +++ b/controllers/certificate_lifecycle_controller.go @@ -30,7 +30,7 @@ import ( ) type CertificateLifecycle struct { - Channel CertificateChannel + Channel chan event.GenericEvent Deadline time.Duration client client.Client diff --git a/controllers/datastore_controller.go b/controllers/datastore_controller.go index 0748cee..8b046ee 100644 --- a/controllers/datastore_controller.go +++ b/controllers/datastore_controller.go @@ -5,13 +5,13 @@ package controllers import ( "context" - "fmt" "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -23,64 +23,71 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1" + "github.com/clastix/kamaji/controllers/utils" ) type DataStore struct { Client client.Client // TenantControlPlaneTrigger is the channel used to communicate across the controllers: - // if a Data Source is updated we have to be sure that the reconciliation of the certificates content + // if a Data Source is updated, we have to be sure that the reconciliation of the certificates content // for each Tenant Control Plane is put in place properly. - TenantControlPlaneTrigger TenantControlPlaneChannel + TenantControlPlaneTrigger chan event.GenericEvent } //+kubebuilder:rbac:groups=kamaji.clastix.io,resources=datastores,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=kamaji.clastix.io,resources=datastores/status,verbs=get;update;patch func (r *DataStore) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - log := log.FromContext(ctx) + logger := log.FromContext(ctx) - ds := &kamajiv1alpha1.DataStore{} - err := r.Client.Get(ctx, request.NamespacedName, ds) - if k8serrors.IsNotFound(err) { - log.Info("resource have been deleted, skipping") + var ds kamajiv1alpha1.DataStore + if err := r.Client.Get(ctx, request.NamespacedName, &ds); err != nil { + if k8serrors.IsNotFound(err) { + logger.Info("resource have been deleted, skipping") - return reconcile.Result{}, nil - } - if err != nil { - log.Error(err, "cannot retrieve the required resource") + return reconcile.Result{}, nil + } + + logger.Error(err, "cannot retrieve the required resource") return reconcile.Result{}, err } - tcpList := kamajiv1alpha1.TenantControlPlaneList{} + var tcpList kamajiv1alpha1.TenantControlPlaneList - if err := r.Client.List(ctx, &tcpList, client.MatchingFieldsSelector{ - Selector: fields.OneTermEqualSelector(kamajiv1alpha1.TenantControlPlaneUsedDataStoreKey, ds.GetName()), - }); err != nil { - log.Error(err, "cannot retrieve list of the Tenant Control Plane using the following instance") + updateErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if lErr := r.Client.List(ctx, &tcpList, client.MatchingFieldsSelector{ + Selector: fields.OneTermEqualSelector(kamajiv1alpha1.TenantControlPlaneUsedDataStoreKey, ds.GetName()), + }); lErr != nil { + return errors.Wrap(lErr, "cannot retrieve list of the Tenant Control Plane using the following instance") + } + // Updating the status with the list of Tenant Control Plane using the following Data Source + tcpSets := sets.NewString() + for _, tcp := range tcpList.Items { + tcpSets.Insert(getNamespacedName(tcp.GetNamespace(), tcp.GetName()).String()) + } - return reconcile.Result{}, err - } - // Updating the status with the list of Tenant Control Plane using the following Data Source - tcpSets := sets.NewString() - for _, tcp := range tcpList.Items { - tcpSets.Insert(getNamespacedName(tcp.GetNamespace(), tcp.GetName()).String()) - } + ds.Status.UsedBy = tcpSets.List() - ds.Status.UsedBy = tcpSets.List() + if sErr := r.Client.Status().Update(ctx, &ds); sErr != nil { + return errors.Wrap(sErr, "cannot update the status for the given instance") + } - if err := r.Client.Status().Update(ctx, ds); err != nil { - log.Error(err, "cannot update the status for the given instance") + return nil + }) + if updateErr != nil { + logger.Error(updateErr, "cannot update DataStore status") - return reconcile.Result{}, err + return reconcile.Result{}, updateErr } // Triggering the reconciliation of the Tenant Control Plane upon a Secret change for _, tcp := range tcpList.Items { - select { - case r.TenantControlPlaneTrigger <- event.GenericEvent{Object: &tcp}: - default: - log.Error(errors.New("channel is full"), fmt.Sprintf("can't push DataStore reconciliation for object %s/%s", tcp.Namespace, tcp.Name)) - } + var shrunkTCP kamajiv1alpha1.TenantControlPlane + + shrunkTCP.Name = tcp.Name + shrunkTCP.Namespace = tcp.Namespace + + go utils.TriggerChannel(ctx, r.TenantControlPlaneTrigger, shrunkTCP) } return reconcile.Result{}, nil @@ -99,7 +106,7 @@ func (r *DataStore) SetupWithManager(mgr controllerruntime.Manager) error { //nolint:forcetypeassert return controllerruntime.NewControllerManagedBy(mgr). For(&kamajiv1alpha1.DataStore{}, builder.WithPredicates( - predicate.ResourceVersionChangedPredicate{}, + predicate.GenerationChangedPredicate{}, )). Watches(&kamajiv1alpha1.TenantControlPlane{}, handler.Funcs{ CreateFunc: func(_ context.Context, createEvent event.TypedCreateEvent[client.Object], w workqueue.TypedRateLimitingInterface[reconcile.Request]) { diff --git a/controllers/soot/manager.go b/controllers/soot/manager.go index 6ee5f0c..5392aff 100644 --- a/controllers/soot/manager.go +++ b/controllers/soot/manager.go @@ -5,8 +5,8 @@ package soot import ( "context" - "errors" "fmt" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/rest" @@ -35,8 +35,9 @@ import ( ) type sootItem struct { - triggers map[string]chan event.GenericEvent - cancelFn context.CancelFunc + triggers []chan event.GenericEvent + cancelFn context.CancelFunc + completedCh chan struct{} } type sootMap map[string]sootItem @@ -98,6 +99,24 @@ func (m *Manager) cleanup(ctx context.Context, req reconcile.Request, tenantCont } v.cancelFn() + // TODO(prometherion): the 10 seconds is an hardcoded number, + // it's widely used across the code base as a timeout with the API Server. + // Evaluate if we would need to make this configurable globally. + deadlineCtx, deadlineFn := context.WithTimeout(ctx, 10*time.Second) + defer deadlineFn() + + select { + case _, open := <-v.completedCh: + if !open { + log.FromContext(ctx).Info("soot manager completed its process") + + break + } + case <-deadlineCtx.Done(): + log.FromContext(ctx).Error(deadlineCtx.Err(), "soot manager didn't exit to timeout") + + break + } delete(m.sootMap, tcpName) @@ -166,12 +185,13 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res // Once the TCP will be ready again, the event will be intercepted and the manager started back. return reconcile.Result{}, m.cleanup(ctx, request, tcp) default: - for name, trigger := range v.triggers { - select { - case trigger <- event.GenericEvent{Object: tcp}: - default: - log.FromContext(ctx).Error(errors.New("channel is full"), fmt.Sprintf("can't push trigger %s reconciliation for object %s/%s", name, tcp.Namespace, tcp.Name)) - } + for _, trigger := range v.triggers { + var shrunkTCP kamajiv1alpha1.TenantControlPlane + + shrunkTCP.Name = tcp.Namespace + shrunkTCP.Namespace = tcp.Namespace + + go utils.TriggerChannel(ctx, trigger, shrunkTCP) } } @@ -317,11 +337,12 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res if err = kubeadmRbac.SetupWithManager(mgr); err != nil { return reconcile.Result{}, err } + completedCh := make(chan struct{}) // Starting the manager go func() { if err = mgr.Start(tcpCtx); err != nil { log.FromContext(ctx).Error(err, "unable to start soot manager") - // The sootMAnagerAnnotation is used to propagate the error between reconciliations with its state: + // The sootManagerAnnotation is used to propagate the error between reconciliations with its state: // this is required to avoid mutex and prevent concurrent read/write on the soot map annotationErr := m.retryTenantControlPlaneAnnotations(ctx, request, func(annotations map[string]string) { annotations[sootManagerAnnotation] = sootManagerFailedAnnotation @@ -332,21 +353,28 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res // When the manager cannot start we're enqueuing back the request to take advantage of the backoff factor // of the queue: this is a goroutine and cannot return an error since the manager is running on its own, // using the sootManagerErrChan channel we can trigger a reconciliation although the TCP hadn't any change. - m.sootManagerErrChan <- event.GenericEvent{Object: tcp} + var shrunkTCP kamajiv1alpha1.TenantControlPlane + + shrunkTCP.Name = tcp.Name + shrunkTCP.Namespace = tcp.Namespace + + m.sootManagerErrChan <- event.GenericEvent{Object: &shrunkTCP} } + close(completedCh) }() m.sootMap[request.NamespacedName.String()] = sootItem{ - triggers: map[string]chan event.GenericEvent{ - "migrate": migrate.TriggerChannel, - "konnectivityAgent": konnectivityAgent.TriggerChannel, - "kubeProxy": kubeProxy.TriggerChannel, - "coreDNS": coreDNS.TriggerChannel, - "uploadKubeadmConfig": uploadKubeadmConfig.TriggerChannel, - "uploadKubeletConfig": uploadKubeletConfig.TriggerChannel, - "bootstrapToken": bootstrapToken.TriggerChannel, + triggers: []chan event.GenericEvent{ + migrate.TriggerChannel, + konnectivityAgent.TriggerChannel, + kubeProxy.TriggerChannel, + coreDNS.TriggerChannel, + uploadKubeadmConfig.TriggerChannel, + uploadKubeletConfig.TriggerChannel, + bootstrapToken.TriggerChannel, }, - cancelFn: tcpCancelFn, + cancelFn: tcpCancelFn, + completedCh: completedCh, } return reconcile.Result{Requeue: true}, nil diff --git a/controllers/tcp_channel.go b/controllers/tcp_channel.go deleted file mode 100644 index a6c4494..0000000 --- a/controllers/tcp_channel.go +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright 2022 Clastix Labs -// SPDX-License-Identifier: Apache-2.0 - -package controllers - -import "sigs.k8s.io/controller-runtime/pkg/event" - -type TenantControlPlaneChannel chan event.GenericEvent diff --git a/controllers/tenantcontrolplane_controller.go b/controllers/tenantcontrolplane_controller.go index 92ddf0f..52916a7 100644 --- a/controllers/tenantcontrolplane_controller.go +++ b/controllers/tenantcontrolplane_controller.go @@ -44,7 +44,7 @@ type TenantControlPlaneReconciler struct { Client client.Client APIReader client.Reader Config TenantControlPlaneReconcilerConfig - TriggerChan TenantControlPlaneChannel + TriggerChan chan event.GenericEvent KamajiNamespace string KamajiServiceAccount string KamajiService string @@ -53,7 +53,7 @@ type TenantControlPlaneReconciler struct { // CertificateChan is the channel used by the CertificateLifecycleController that is checking for // certificates and kubeconfig user certs validity: a generic event for the given TCP will be triggered // once the validity threshold for the given certificate is reached. - CertificateChan CertificateChannel + CertificateChan chan event.GenericEvent clock mutex.Clock } diff --git a/controllers/utils/trigger_channel.go b/controllers/utils/trigger_channel.go new file mode 100644 index 0000000..232b785 --- /dev/null +++ b/controllers/utils/trigger_channel.go @@ -0,0 +1,26 @@ +// Copyright 2022 Clastix Labs +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "context" + "time" + + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + + kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1" +) + +func TriggerChannel(ctx context.Context, receiver chan event.GenericEvent, tcp kamajiv1alpha1.TenantControlPlane) { + deadlineCtx, cancelFn := context.WithTimeout(ctx, 10*time.Second) + defer cancelFn() + + select { + case receiver <- event.GenericEvent{Object: &tcp}: + return + case <-deadlineCtx.Done(): + log.FromContext(ctx).Error(deadlineCtx.Err(), "cannot send due to timeout") + } +}