diff --git a/controllers/datastore_controller.go b/controllers/datastore_controller.go index 95bd66a..e0d0181 100644 --- a/controllers/datastore_controller.go +++ b/controllers/datastore_controller.go @@ -5,7 +5,9 @@ package controllers import ( "context" + "fmt" + "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" k8stypes "k8s.io/apimachinery/pkg/types" @@ -76,7 +78,11 @@ func (r *DataStore) Reconcile(ctx context.Context, request reconcile.Request) (r for _, i := range tcpList.Items { tcp := i - r.TenantControlPlaneTrigger <- event.GenericEvent{Object: &tcp} + select { + case r.TenantControlPlaneTrigger <- event.GenericEvent{Object: &tcp}: + default: + log.Error(errors.New("channel is full"), fmt.Sprintf("can't push DataStore reconciliation for object %s/%s", tcp.Namespace, tcp.Name)) + } } return reconcile.Result{}, nil diff --git a/controllers/soot/manager.go b/controllers/soot/manager.go index e457697..c43a139 100644 --- a/controllers/soot/manager.go +++ b/controllers/soot/manager.go @@ -5,9 +5,10 @@ package soot import ( "context" + "errors" "fmt" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/rest" "k8s.io/client-go/util/retry" "k8s.io/utils/ptr" @@ -34,12 +35,17 @@ import ( ) type sootItem struct { - triggers []chan event.GenericEvent + triggers map[string]chan event.GenericEvent cancelFn context.CancelFunc } type sootMap map[string]sootItem +const ( + sootManagerAnnotation = "kamaji.clastix.io/soot" + sootManagerFailedAnnotation = "failed" +) + type Manager struct { client client.Client sootMap sootMap @@ -99,12 +105,32 @@ func (m *Manager) cleanup(ctx context.Context, req reconcile.Request, tenantCont return nil } +func (m *Manager) retryTenantControlPlaneAnnotations(ctx context.Context, request reconcile.Request, modifierFn func(annotations map[string]string)) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + tcp, err := m.retrieveTenantControlPlane(ctx, request)() + if err != nil { + return err + } + + if tcp.Annotations == nil { + tcp.Annotations = map[string]string{} + } + + modifierFn(tcp.Annotations) + + tcp.SetAnnotations(tcp.Annotations) + + return m.AdminClient.Update(ctx, tcp) + }) +} + +//nolint:maintidx func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, err error) { // Retrieving the TenantControlPlane: // in case of deletion, we must be sure to properly remove from the memory the soot manager. tcp := &kamajiv1alpha1.TenantControlPlane{} if err = m.client.Get(ctx, request.NamespacedName, tcp); err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { return reconcile.Result{}, m.cleanup(ctx, request, nil) } @@ -126,6 +152,12 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res v, ok := m.sootMap[request.String()] if ok { switch { + case tcp.Annotations != nil && tcp.Annotations[sootManagerAnnotation] == sootManagerFailedAnnotation: + delete(m.sootMap, request.String()) + + return reconcile.Result{}, m.retryTenantControlPlaneAnnotations(ctx, request, func(annotations map[string]string) { + delete(annotations, sootManagerAnnotation) + }) case tcpStatus == kamajiv1alpha1.VersionCARotating: // The TenantControlPlane CA has been rotated, it means the running manager // must be restarted to avoid certificate signed by unknown authority errors. @@ -136,8 +168,12 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res // Once the TCP will be ready again, the event will be intercepted and the manager started back. return reconcile.Result{}, m.cleanup(ctx, request, tcp) default: - for _, trigger := range v.triggers { - trigger <- event.GenericEvent{Object: tcp} + for name, trigger := range v.triggers { + select { + case trigger <- event.GenericEvent{Object: tcp}: + default: + log.FromContext(ctx).Error(errors.New("channel is full"), fmt.Sprintf("can't push trigger %s reconciliation for object %s/%s", name, tcp.Namespace, tcp.Name)) + } } } @@ -275,6 +311,14 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res go func() { if err = mgr.Start(tcpCtx); err != nil { log.FromContext(ctx).Error(err, "unable to start soot manager") + // The sootMAnagerAnnotation is used to propagate the error between reconciliations with its state: + // this is required to avoid mutex and prevent concurrent read/write on the soot map + annotationErr := m.retryTenantControlPlaneAnnotations(ctx, request, func(annotations map[string]string) { + annotations[sootManagerAnnotation] = sootManagerFailedAnnotation + }) + if annotationErr != nil { + log.FromContext(ctx).Error(err, "unable to update TenantControlPlane for soot failed annotation") + } // When the manager cannot start we're enqueuing back the request to take advantage of the backoff factor // of the queue: this is a goroutine and cannot return an error since the manager is running on its own, // using the sootManagerErrChan channel we can trigger a reconciliation although the TCP hadn't any change. @@ -283,14 +327,14 @@ func (m *Manager) Reconcile(ctx context.Context, request reconcile.Request) (res }() m.sootMap[request.NamespacedName.String()] = sootItem{ - triggers: []chan event.GenericEvent{ - migrate.TriggerChannel, - konnectivityAgent.TriggerChannel, - kubeProxy.TriggerChannel, - coreDNS.TriggerChannel, - uploadKubeadmConfig.TriggerChannel, - uploadKubeletConfig.TriggerChannel, - bootstrapToken.TriggerChannel, + triggers: map[string]chan event.GenericEvent{ + "migrate": migrate.TriggerChannel, + "konnectivityAgent": konnectivityAgent.TriggerChannel, + "kubeProxy": kubeProxy.TriggerChannel, + "coreDNS": coreDNS.TriggerChannel, + "uploadKubeadmConfig": uploadKubeadmConfig.TriggerChannel, + "uploadKubeletConfig": uploadKubeletConfig.TriggerChannel, + "bootstrapToken": bootstrapToken.TriggerChannel, }, cancelFn: tcpCancelFn, }