Files
kamaji/cmd/manager/cmd.go
Dario Tranchitella b027e23b99 feat: enhancing concurrent reconciliations (#790)
* feat: buffered channels for generic events

Channels used for GenericEvent feeding for cross controllers triggers
are now buffered according to the --max-concurrent-tcp-reconciles: this
is required to avoid channel full errors when dealing with large
management clusters serving a sizeable amount of Tenant Control Planes.

Increasing this value will put more pressure on memory (mostly for GC)
and CPU (provisioning multiple certificates at the same time).

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* refactor: retrying datastore status update

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* feat(performance): reducing memory consumption for channel triggers

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* feat(datastore): reconcile events only for root object changes

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* feat: waiting soot manager exit before termination

This change introduces a grace period of 10 seconds before abruptly
terminating the Tenant Control Plane deployment, allowing the soot
manager to complete its exit procedure and avoid false positive errors
due to API Server being unresponsive due to user deletion.

Aim of this change is reducing the amount of false positive errors upon
mass deletion of Tenant COntrol Plane objects.

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* refactor: unbuffered channel with timeout

WatchesRawSource is non blocking, no need to check if channel is full.
To prevent deadlocks a WithTimeout check has been introduced.

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

---------

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>
2025-04-23 21:00:29 +02:00

327 lines
13 KiB
Go

// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package manager
import (
"flag"
"fmt"
"io"
"net/http"
"os"
goRuntime "runtime"
"time"
telemetryclient "github.com/clastix/kamaji-telemetry/pkg/client"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
cmdutils "github.com/clastix/kamaji/cmd/utils"
"github.com/clastix/kamaji/controllers"
"github.com/clastix/kamaji/controllers/soot"
"github.com/clastix/kamaji/internal"
"github.com/clastix/kamaji/internal/builders/controlplane"
datastoreutils "github.com/clastix/kamaji/internal/datastore/utils"
"github.com/clastix/kamaji/internal/webhook"
"github.com/clastix/kamaji/internal/webhook/handlers"
"github.com/clastix/kamaji/internal/webhook/routes"
)
//nolint:maintidx
func NewCmd(scheme *runtime.Scheme) *cobra.Command {
// CLI flags
var (
metricsBindAddress string
healthProbeBindAddress string
leaderElect bool
tmpDirectory string
kineImage string
controllerReconcileTimeout time.Duration
cacheResyncPeriod time.Duration
datastore string
managerNamespace string
managerServiceAccountName string
managerServiceName string
webhookCABundle []byte
migrateJobImage string
maxConcurrentReconciles int
disableTelemetry bool
certificateExpirationDeadline time.Duration
webhookCAPath string
)
ctx := ctrl.SetupSignalHandler()
cmd := &cobra.Command{
Use: "manager",
Short: "Start the Kamaji Kubernetes Operator",
SilenceErrors: false,
SilenceUsage: true,
PreRunE: func(cmd *cobra.Command, _ []string) (err error) {
// Avoid to pollute Kamaji stdout with useless details by the underlying klog implementations
klog.SetOutput(io.Discard)
klog.LogToStderr(false)
if err = cmdutils.CheckFlags(cmd.Flags(), []string{"kine-image", "migrate-image", "tmp-directory", "pod-namespace", "webhook-service-name", "serviceaccount-name", "webhook-ca-path"}...); err != nil {
return err
}
if certificateExpirationDeadline < 24*time.Hour {
return fmt.Errorf("certificate expiration deadline must be at least 24 hours")
}
if webhookCABundle, err = os.ReadFile(webhookCAPath); err != nil {
return fmt.Errorf("unable to read webhook CA: %w", err)
}
if err = datastoreutils.CheckExists(ctx, scheme, datastore); err != nil {
return err
}
if controllerReconcileTimeout.Seconds() == 0 {
return fmt.Errorf("the controller reconcile timeout must be greater than zero")
}
return nil
},
RunE: func(*cobra.Command, []string) error {
setupLog := ctrl.Log.WithName("setup")
setupLog.Info(fmt.Sprintf("Kamaji version %s %s%s", internal.GitTag, internal.GitCommit, internal.GitDirty))
setupLog.Info(fmt.Sprintf("Build from: %s", internal.GitRepo))
setupLog.Info(fmt.Sprintf("Build date: %s", internal.BuildTime))
setupLog.Info(fmt.Sprintf("Go Version: %s", goRuntime.Version()))
setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", goRuntime.GOOS, goRuntime.GOARCH))
setupLog.Info(fmt.Sprintf("Telemetry enabled: %t", !disableTelemetry))
telemetryClient := telemetryclient.New(http.Client{Timeout: 5 * time.Second}, "https://telemetry.clastix.io")
if disableTelemetry {
telemetryClient = telemetryclient.NewNewOp()
}
ctrlOpts := ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: metricsBindAddress,
},
WebhookServer: ctrlwebhook.NewServer(ctrlwebhook.Options{
Port: 9443,
}),
HealthProbeBindAddress: healthProbeBindAddress,
LeaderElection: leaderElect,
LeaderElectionNamespace: managerNamespace,
LeaderElectionID: "kamaji.clastix.io",
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.SyncPeriod = &cacheResyncPeriod
return cache.New(config, opts)
},
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrlOpts)
if err != nil {
setupLog.Error(err, "unable to start manager")
return err
}
tcpChannel, certChannel := make(chan event.GenericEvent), make(chan event.GenericEvent)
if err = (&controllers.DataStore{Client: mgr.GetClient(), TenantControlPlaneTrigger: tcpChannel}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DataStore")
return err
}
reconciler := &controllers.TenantControlPlaneReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Config: controllers.TenantControlPlaneReconcilerConfig{
ReconcileTimeout: controllerReconcileTimeout,
DefaultDataStoreName: datastore,
KineContainerImage: kineImage,
TmpBaseDirectory: tmpDirectory,
},
CertificateChan: certChannel,
TriggerChan: tcpChannel,
KamajiNamespace: managerNamespace,
KamajiServiceAccount: managerServiceAccountName,
KamajiService: managerServiceName,
KamajiMigrateImage: migrateJobImage,
MaxConcurrentReconciles: maxConcurrentReconciles,
}
if err = reconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Namespace")
return err
}
k8sVersion, versionErr := cmdutils.KubernetesVersion(mgr.GetConfig())
if versionErr != nil {
setupLog.Error(err, "unable to get kubernetes version")
k8sVersion = "Unknown"
}
if !disableTelemetry {
err = mgr.Add(&controllers.TelemetryController{
Client: mgr.GetClient(),
KubernetesVersion: k8sVersion,
KamajiVersion: internal.GitTag,
TelemetryClient: telemetryClient,
LeaderElectionNamespace: ctrlOpts.LeaderElectionNamespace,
LeaderElectionID: ctrlOpts.LeaderElectionID,
})
if err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TelemetryController")
return err
}
}
if err = (&controllers.CertificateLifecycle{Channel: certChannel, Deadline: certificateExpirationDeadline}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CertificateLifecycle")
return err
}
if err = (&kamajiv1alpha1.DatastoreUsedSecret{}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create indexer", "indexer", "DatastoreUsedSecret")
return err
}
if err = (&kamajiv1alpha1.TenantControlPlaneStatusDataStore{}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create indexer", "indexer", "TenantControlPlaneStatusDataStore")
return err
}
err = webhook.Register(mgr, map[routes.Route][]handlers.Handler{
routes.TenantControlPlaneMigrate{}: {
handlers.Freeze{},
},
routes.TenantControlPlaneDefaults{}: {
handlers.TenantControlPlaneDefaults{
DefaultDatastore: datastore,
},
},
routes.TenantControlPlaneValidate{}: {
handlers.TenantControlPlaneCertSANs{},
handlers.TenantControlPlaneName{},
handlers.TenantControlPlaneVersion{},
handlers.TenantControlPlaneKubeletAddresses{},
handlers.TenantControlPlaneDataStore{Client: mgr.GetClient()},
handlers.TenantControlPlaneDeployment{
Client: mgr.GetClient(),
DeploymentBuilder: controlplane.Deployment{
Client: mgr.GetClient(),
KineContainerImage: kineImage,
},
KonnectivityBuilder: controlplane.Konnectivity{
Scheme: *mgr.GetScheme(),
},
},
handlers.TenantControlPlaneServiceCIDR{},
handlers.TenantControlPlaneLoadBalancerSourceRanges{},
},
routes.TenantControlPlaneTelemetry{}: {
handlers.TenantControlPlaneTelemetry{
Enabled: !disableTelemetry,
TelemetryClient: telemetryClient,
KamajiVersion: internal.GitTag,
KubernetesVersion: k8sVersion,
},
},
routes.DataStoreValidate{}: {
handlers.DataStoreValidation{Client: mgr.GetClient()},
},
routes.DataStoreSecrets{}: {
handlers.DataStoreSecretValidation{Client: mgr.GetClient()},
},
})
if err != nil {
setupLog.Error(err, "unable to create webhook")
return err
}
if err = (&soot.Manager{
MigrateCABundle: webhookCABundle,
MigrateServiceName: managerServiceName,
MigrateServiceNamespace: managerNamespace,
AdminClient: mgr.GetClient(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to set up soot manager")
return err
}
if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
return err
}
if err = mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
return err
}
setupLog.Info("starting manager")
if err = mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
return err
}
return nil
},
}
// Setting zap logger
zapfs := flag.NewFlagSet("zap", flag.ExitOnError)
opts := zap.Options{
Development: true,
}
opts.BindFlags(zapfs)
cmd.Flags().AddGoFlagSet(zapfs)
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
// Setting CLI flags
cmd.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
cmd.Flags().StringVar(&healthProbeBindAddress, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
cmd.Flags().BoolVar(&leaderElect, "leader-elect", true, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
cmd.Flags().StringVar(&tmpDirectory, "tmp-directory", "/tmp/kamaji", "Directory which will be used to work with temporary files.")
cmd.Flags().StringVar(&kineImage, "kine-image", "rancher/kine:v0.11.10-amd64", "Container image along with tag to use for the Kine sidecar container (used only if etcd-storage-type is set to one of kine strategies).")
cmd.Flags().StringVar(&datastore, "datastore", "", "Optional, the default DataStore that should be used by Kamaji to setup the required storage of Tenant Control Planes with undeclared DataStore.")
cmd.Flags().StringVar(&migrateJobImage, "migrate-image", fmt.Sprintf("clastix/kamaji:%s", internal.GitTag), "Specify the container image to launch when a TenantControlPlane is migrated to a new datastore.")
cmd.Flags().IntVar(&maxConcurrentReconciles, "max-concurrent-tcp-reconciles", 1, "Specify the number of workers for the Tenant Control Plane controller (beware of CPU consumption)")
cmd.Flags().StringVar(&managerNamespace, "pod-namespace", os.Getenv("POD_NAMESPACE"), "The Kubernetes Namespace on which the Operator is running in, required for the TenantControlPlane migration jobs.")
cmd.Flags().StringVar(&managerServiceName, "webhook-service-name", "kamaji-webhook-service", "The Kamaji webhook server Service name which is used to get validation webhooks, required for the TenantControlPlane migration jobs.")
cmd.Flags().StringVar(&managerServiceAccountName, "serviceaccount-name", os.Getenv("SERVICE_ACCOUNT"), "The Kubernetes Namespace on which the Operator is running in, required for the TenantControlPlane migration jobs.")
cmd.Flags().StringVar(&webhookCAPath, "webhook-ca-path", "/tmp/k8s-webhook-server/serving-certs/ca.crt", "Path to the Manager webhook server CA, required for the TenantControlPlane migration jobs.")
cmd.Flags().DurationVar(&controllerReconcileTimeout, "controller-reconcile-timeout", 30*time.Second, "The reconciliation request timeout before the controller withdraw the external resource calls, such as dealing with the Datastore, or the Tenant Control Plane API endpoint.")
cmd.Flags().DurationVar(&cacheResyncPeriod, "cache-resync-period", 10*time.Hour, "The controller-runtime.Manager cache resync period.")
cmd.Flags().BoolVar(&disableTelemetry, "disable-telemetry", false, "Disable the analytics traces collection.")
cmd.Flags().DurationVar(&certificateExpirationDeadline, "certificate-expiration-deadline", 24*time.Hour, "Define the deadline upon certificate expiration to start the renewal process, cannot be less than a 24 hours.")
cobra.OnInitialize(func() {
viper.AutomaticEnv()
})
return cmd
}