Files
kamaji/controllers/telemetry_controller.go
Dario Tranchitella b68010e072 feat!: introducing sleeping status (#773)
* feat(api): introducing sleeping status

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* chore(helm)!: introducing sleeping status

Marking this commit as breaking since a CustomResourceDefinition update
is required for users dealing with scale to zero since the introduction
of the new enum for the status field.

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* docs: introducing sleeping status

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

---------

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>
2025-04-07 16:44:13 +02:00

122 lines
3.5 KiB
Go

// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package controllers
import (
"context"
"time"
"github.com/clastix/kamaji-telemetry/api"
telemetry "github.com/clastix/kamaji-telemetry/pkg/client"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
)
type TelemetryController struct {
Client client.Client
TelemetryClient telemetry.Client
LeaderElectionNamespace string
LeaderElectionID string
KamajiVersion string
KubernetesVersion string
}
func (m *TelemetryController) retrieveControllerUID(ctx context.Context) (string, error) {
var defaultSvc corev1.Service
if err := m.Client.Get(ctx, types.NamespacedName{Name: "kubernetes", Namespace: "default"}, &defaultSvc); err != nil {
return "", errors.Wrap(err, "cannot start the telemetry controller")
}
return string(defaultSvc.UID), nil
}
func (m *TelemetryController) Start(ctx context.Context) error {
logger := log.FromContext(ctx)
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
uid, err := m.retrieveControllerUID(ctx)
if err != nil {
logger.Error(err, "cannot retrieve controller UID")
return err
}
// First run to avoid waiting 5 minutes
go m.collectStats(ctx, uid)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
go m.collectStats(ctx, uid)
}
}
}
func (m *TelemetryController) collectStats(ctx context.Context, uid string) {
logger := log.FromContext(ctx)
stats := api.Stats{
UUID: uid,
KamajiVersion: m.KamajiVersion,
KubernetesVersion: m.KubernetesVersion,
TenantControlPlanes: api.TenantControlPlane{},
Datastores: api.Datastores{},
}
var tcpList kamajiv1alpha1.TenantControlPlaneList
if err := m.Client.List(ctx, &tcpList); err != nil {
logger.Error(err, "cannot list TenantControlPlane")
return
}
for _, tcp := range tcpList.Items {
switch {
case ptr.Deref(tcp.Status.Kubernetes.Version.Status, kamajiv1alpha1.VersionProvisioning) == kamajiv1alpha1.VersionSleeping:
stats.TenantControlPlanes.Sleeping++
case tcp.Status.Kubernetes.Version.Status != nil && *tcp.Status.Kubernetes.Version.Status == kamajiv1alpha1.VersionNotReady:
stats.TenantControlPlanes.NotReady++
case tcp.Status.Kubernetes.Version.Status != nil && *tcp.Status.Kubernetes.Version.Status == kamajiv1alpha1.VersionUpgrading:
stats.TenantControlPlanes.Upgrading++
default:
stats.TenantControlPlanes.Running++
}
}
var datastoreList kamajiv1alpha1.DataStoreList
if err := m.Client.List(ctx, &datastoreList); err != nil {
logger.Error(err, "cannot list DataStores")
return
}
for _, ds := range datastoreList.Items {
switch ds.Spec.Driver {
case kamajiv1alpha1.EtcdDriver:
stats.Datastores.Etcd.ShardCount++
stats.Datastores.Etcd.UsedByCount += len(ds.Status.UsedBy)
case kamajiv1alpha1.KinePostgreSQLDriver:
stats.Datastores.PostgreSQL.ShardCount++
stats.Datastores.PostgreSQL.UsedByCount += len(ds.Status.UsedBy)
case kamajiv1alpha1.KineMySQLDriver:
stats.Datastores.MySQL.ShardCount++
stats.Datastores.MySQL.UsedByCount += len(ds.Status.UsedBy)
case kamajiv1alpha1.KineNatsDriver:
stats.Datastores.NATS.ShardCount++
stats.Datastores.NATS.UsedByCount += len(ds.Status.UsedBy)
}
}
m.TelemetryClient.PushStats(ctx, stats)
}