Files
kamaji/cmd/manager/cmd.go
Dario Tranchitella cb2152d5a7 feat: kubeconfig generator (#933)
* feat(api): kubeconfig generator

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* refactor: abstracting enqueue to channel function

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* fix: avoiding multiple context registration

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* feat: kubeconfig generator

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* docs: kubeconfig generator

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* feat(helm): deployment for kubeconfig generator

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

---------

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>
2025-09-22 15:32:50 +02:00

331 lines
13 KiB
Go

// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package manager
import (
"context"
"flag"
"fmt"
"io"
"net/http"
"os"
goRuntime "runtime"
"time"
telemetryclient "github.com/clastix/kamaji-telemetry/pkg/client"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
cmdutils "github.com/clastix/kamaji/cmd/utils"
"github.com/clastix/kamaji/controllers"
"github.com/clastix/kamaji/controllers/soot"
"github.com/clastix/kamaji/internal"
"github.com/clastix/kamaji/internal/builders/controlplane"
datastoreutils "github.com/clastix/kamaji/internal/datastore/utils"
"github.com/clastix/kamaji/internal/webhook"
"github.com/clastix/kamaji/internal/webhook/handlers"
"github.com/clastix/kamaji/internal/webhook/routes"
)
//nolint:maintidx
func NewCmd(scheme *runtime.Scheme) *cobra.Command {
// CLI flags
var (
metricsBindAddress string
healthProbeBindAddress string
leaderElect bool
tmpDirectory string
kineImage string
controllerReconcileTimeout time.Duration
cacheResyncPeriod time.Duration
datastore string
managerNamespace string
managerServiceAccountName string
managerServiceName string
webhookCABundle []byte
migrateJobImage string
maxConcurrentReconciles int
disableTelemetry bool
certificateExpirationDeadline time.Duration
webhookCAPath string
)
cmd := &cobra.Command{
Use: "manager",
Short: "Start the Kamaji Kubernetes Operator",
SilenceErrors: false,
SilenceUsage: true,
PreRunE: func(cmd *cobra.Command, _ []string) (err error) {
// Avoid to pollute Kamaji stdout with useless details by the underlying klog implementations
klog.SetOutput(io.Discard)
klog.LogToStderr(false)
if err = cmdutils.CheckFlags(cmd.Flags(), []string{"kine-image", "migrate-image", "tmp-directory", "pod-namespace", "webhook-service-name", "serviceaccount-name", "webhook-ca-path"}...); err != nil {
return err
}
if certificateExpirationDeadline < 24*time.Hour {
return fmt.Errorf("certificate expiration deadline must be at least 24 hours")
}
if webhookCABundle, err = os.ReadFile(webhookCAPath); err != nil {
return fmt.Errorf("unable to read webhook CA: %w", err)
}
if err = datastoreutils.CheckExists(context.Background(), scheme, datastore); err != nil {
return err
}
if controllerReconcileTimeout.Seconds() == 0 {
return fmt.Errorf("the controller reconcile timeout must be greater than zero")
}
return nil
},
RunE: func(*cobra.Command, []string) error {
ctx := ctrl.SetupSignalHandler()
setupLog := ctrl.Log.WithName("setup")
setupLog.Info(fmt.Sprintf("Kamaji version %s %s%s", internal.GitTag, internal.GitCommit, internal.GitDirty))
setupLog.Info(fmt.Sprintf("Build from: %s", internal.GitRepo))
setupLog.Info(fmt.Sprintf("Build date: %s", internal.BuildTime))
setupLog.Info(fmt.Sprintf("Go Version: %s", goRuntime.Version()))
setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", goRuntime.GOOS, goRuntime.GOARCH))
setupLog.Info(fmt.Sprintf("Telemetry enabled: %t", !disableTelemetry))
telemetryClient := telemetryclient.New(http.Client{Timeout: 5 * time.Second}, "https://telemetry.clastix.io")
if disableTelemetry {
telemetryClient = telemetryclient.NewNewOp()
}
ctrlOpts := ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: metricsBindAddress,
},
WebhookServer: ctrlwebhook.NewServer(ctrlwebhook.Options{
Port: 9443,
}),
HealthProbeBindAddress: healthProbeBindAddress,
LeaderElection: leaderElect,
LeaderElectionNamespace: managerNamespace,
LeaderElectionID: "kamaji.clastix.io",
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.SyncPeriod = &cacheResyncPeriod
return cache.New(config, opts)
},
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrlOpts)
if err != nil {
setupLog.Error(err, "unable to start manager")
return err
}
tcpChannel, certChannel := make(chan event.GenericEvent), make(chan event.GenericEvent)
if err = (&controllers.DataStore{Client: mgr.GetClient(), TenantControlPlaneTrigger: tcpChannel}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DataStore")
return err
}
reconciler := &controllers.TenantControlPlaneReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Config: controllers.TenantControlPlaneReconcilerConfig{
DefaultDataStoreName: datastore,
KineContainerImage: kineImage,
TmpBaseDirectory: tmpDirectory,
CertExpirationThreshold: certificateExpirationDeadline,
},
ReconcileTimeout: controllerReconcileTimeout,
CertificateChan: certChannel,
TriggerChan: tcpChannel,
KamajiNamespace: managerNamespace,
KamajiServiceAccount: managerServiceAccountName,
KamajiService: managerServiceName,
KamajiMigrateImage: migrateJobImage,
MaxConcurrentReconciles: maxConcurrentReconciles,
}
if err = reconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Namespace")
return err
}
k8sVersion, versionErr := cmdutils.KubernetesVersion(mgr.GetConfig())
if versionErr != nil {
setupLog.Error(err, "unable to get kubernetes version")
k8sVersion = "Unknown"
}
if !disableTelemetry {
err = mgr.Add(&controllers.TelemetryController{
Client: mgr.GetClient(),
KubernetesVersion: k8sVersion,
KamajiVersion: internal.GitTag,
TelemetryClient: telemetryClient,
LeaderElectionNamespace: ctrlOpts.LeaderElectionNamespace,
LeaderElectionID: ctrlOpts.LeaderElectionID,
})
if err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TelemetryController")
return err
}
}
certController := &controllers.CertificateLifecycle{Channel: certChannel, Deadline: certificateExpirationDeadline}
certController.EnqueueFn = certController.EnqueueForTenantControlPlane
if err = certController.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CertificateLifecycle")
return err
}
if err = (&kamajiv1alpha1.DatastoreUsedSecret{}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create indexer", "indexer", "DatastoreUsedSecret")
return err
}
if err = (&kamajiv1alpha1.TenantControlPlaneStatusDataStore{}).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create indexer", "indexer", "TenantControlPlaneStatusDataStore")
return err
}
err = webhook.Register(mgr, map[routes.Route][]handlers.Handler{
routes.TenantControlPlaneMigrate{}: {
handlers.Freeze{},
},
routes.TenantControlPlaneDefaults{}: {
handlers.TenantControlPlaneDefaults{
DefaultDatastore: datastore,
},
},
routes.TenantControlPlaneValidate{}: {
handlers.TenantControlPlaneCertSANs{},
handlers.TenantControlPlaneName{},
handlers.TenantControlPlaneVersion{},
handlers.TenantControlPlaneDataStore{Client: mgr.GetClient()},
handlers.TenantControlPlaneDeployment{
Client: mgr.GetClient(),
DeploymentBuilder: controlplane.Deployment{
Client: mgr.GetClient(),
KineContainerImage: kineImage,
},
KonnectivityBuilder: controlplane.Konnectivity{
Scheme: *mgr.GetScheme(),
},
},
handlers.TenantControlPlaneServiceCIDR{},
handlers.TenantControlPlaneLoadBalancerSourceRanges{},
},
routes.TenantControlPlaneTelemetry{}: {
handlers.TenantControlPlaneTelemetry{
Enabled: !disableTelemetry,
TelemetryClient: telemetryClient,
KamajiVersion: internal.GitTag,
KubernetesVersion: k8sVersion,
},
},
routes.DataStoreValidate{}: {
handlers.DataStoreValidation{Client: mgr.GetClient()},
},
routes.DataStoreSecrets{}: {
handlers.DataStoreSecretValidation{Client: mgr.GetClient()},
},
})
if err != nil {
setupLog.Error(err, "unable to create webhook")
return err
}
if err = (&soot.Manager{
MigrateCABundle: webhookCABundle,
MigrateServiceName: managerServiceName,
MigrateServiceNamespace: managerNamespace,
AdminClient: mgr.GetClient(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to set up soot manager")
return err
}
if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
return err
}
if err = mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
return err
}
setupLog.Info("starting manager")
if err = mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
return err
}
return nil
},
}
// Setting zap logger
zapfs := flag.NewFlagSet("zap", flag.ExitOnError)
opts := zap.Options{
Development: true,
}
opts.BindFlags(zapfs)
cmd.Flags().AddGoFlagSet(zapfs)
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
// Setting CLI flags
cmd.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
cmd.Flags().StringVar(&healthProbeBindAddress, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
cmd.Flags().BoolVar(&leaderElect, "leader-elect", true, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
cmd.Flags().StringVar(&tmpDirectory, "tmp-directory", "/tmp/kamaji", "Directory which will be used to work with temporary files.")
cmd.Flags().StringVar(&kineImage, "kine-image", "rancher/kine:v0.11.10-amd64", "Container image along with tag to use for the Kine sidecar container (used only if etcd-storage-type is set to one of kine strategies).")
cmd.Flags().StringVar(&datastore, "datastore", "", "Optional, the default DataStore that should be used by Kamaji to setup the required storage of Tenant Control Planes with undeclared DataStore.")
cmd.Flags().StringVar(&migrateJobImage, "migrate-image", fmt.Sprintf("%s/clastix/kamaji:%s", internal.ContainerRepository, internal.GitTag), "Specify the container image to launch when a TenantControlPlane is migrated to a new datastore.")
cmd.Flags().IntVar(&maxConcurrentReconciles, "max-concurrent-tcp-reconciles", 1, "Specify the number of workers for the Tenant Control Plane controller (beware of CPU consumption)")
cmd.Flags().StringVar(&managerNamespace, "pod-namespace", os.Getenv("POD_NAMESPACE"), "The Kubernetes Namespace on which the Operator is running in, required for the TenantControlPlane migration jobs.")
cmd.Flags().StringVar(&managerServiceName, "webhook-service-name", "kamaji-webhook-service", "The Kamaji webhook server Service name which is used to get validation webhooks, required for the TenantControlPlane migration jobs.")
cmd.Flags().StringVar(&managerServiceAccountName, "serviceaccount-name", os.Getenv("SERVICE_ACCOUNT"), "The Kubernetes Namespace on which the Operator is running in, required for the TenantControlPlane migration jobs.")
cmd.Flags().StringVar(&webhookCAPath, "webhook-ca-path", "/tmp/k8s-webhook-server/serving-certs/ca.crt", "Path to the Manager webhook server CA, required for the TenantControlPlane migration jobs.")
cmd.Flags().DurationVar(&controllerReconcileTimeout, "controller-reconcile-timeout", 30*time.Second, "The reconciliation request timeout before the controller withdraw the external resource calls, such as dealing with the Datastore, or the Tenant Control Plane API endpoint.")
cmd.Flags().DurationVar(&cacheResyncPeriod, "cache-resync-period", 10*time.Hour, "The controller-runtime.Manager cache resync period.")
cmd.Flags().BoolVar(&disableTelemetry, "disable-telemetry", false, "Disable the analytics traces collection.")
cmd.Flags().DurationVar(&certificateExpirationDeadline, "certificate-expiration-deadline", 24*time.Hour, "Define the deadline upon certificate expiration to start the renewal process, cannot be less than a 24 hours.")
cobra.OnInitialize(func() {
viper.AutomaticEnv()
})
return cmd
}