Files
cozystack/internal/telemetry/collector.go
Andrei Kvapil 3d962685ce Move project from aenix-io to cozystack repository
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>
2025-03-10 21:16:58 +01:00

293 lines
7.5 KiB
Go

package telemetry
import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
)
// Collector handles telemetry data collection and sending
type Collector struct {
client client.Client
discoveryClient discovery.DiscoveryInterface
config *Config
ticker *time.Ticker
stopCh chan struct{}
}
// NewCollector creates a new telemetry collector
func NewCollector(client client.Client, config *Config, kubeConfig *rest.Config) (*Collector, error) {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(kubeConfig)
if err != nil {
return nil, fmt.Errorf("failed to create discovery client: %w", err)
}
return &Collector{
client: client,
discoveryClient: discoveryClient,
config: config,
}, nil
}
// Start implements manager.Runnable
func (c *Collector) Start(ctx context.Context) error {
if c.config.Disabled {
return nil
}
c.ticker = time.NewTicker(c.config.Interval)
c.stopCh = make(chan struct{})
// Initial collection
c.collect(ctx)
for {
select {
case <-ctx.Done():
c.ticker.Stop()
close(c.stopCh)
return nil
case <-c.ticker.C:
c.collect(ctx)
}
}
}
// NeedLeaderElection implements manager.LeaderElectionRunnable
func (c *Collector) NeedLeaderElection() bool {
// Only run telemetry collector on the leader
return true
}
// Stop halts telemetry collection
func (c *Collector) Stop() {
close(c.stopCh)
}
// getSizeGroup returns the exponential size group for PVC
func getSizeGroup(size resource.Quantity) string {
gb := size.Value() / (1024 * 1024 * 1024)
switch {
case gb <= 1:
return "1Gi"
case gb <= 5:
return "5Gi"
case gb <= 10:
return "10Gi"
case gb <= 25:
return "25Gi"
case gb <= 50:
return "50Gi"
case gb <= 100:
return "100Gi"
case gb <= 250:
return "250Gi"
case gb <= 500:
return "500Gi"
case gb <= 1024:
return "1Ti"
case gb <= 2048:
return "2Ti"
case gb <= 5120:
return "5Ti"
default:
return "10Ti"
}
}
// collect gathers and sends telemetry data
func (c *Collector) collect(ctx context.Context) {
logger := log.FromContext(ctx).V(1)
// Get cluster ID from kube-system namespace
var kubeSystemNS corev1.Namespace
if err := c.client.Get(ctx, types.NamespacedName{Name: "kube-system"}, &kubeSystemNS); err != nil {
logger.Info(fmt.Sprintf("Failed to get kube-system namespace: %v", err))
return
}
clusterID := string(kubeSystemNS.UID)
var cozystackCM corev1.ConfigMap
if err := c.client.Get(ctx, types.NamespacedName{Namespace: "cozy-system", Name: "cozystack"}, &cozystackCM); err != nil {
logger.Info(fmt.Sprintf("Failed to get cozystack configmap in cozy-system namespace: %v", err))
return
}
oidcEnabled := cozystackCM.Data["oidc-enabled"]
bundle := cozystackCM.Data["bundle-name"]
bundleEnable := cozystackCM.Data["bundle-enable"]
bundleDisable := cozystackCM.Data["bundle-disable"]
// Get Kubernetes version from nodes
var nodeList corev1.NodeList
if err := c.client.List(ctx, &nodeList); err != nil {
logger.Info(fmt.Sprintf("Failed to list nodes: %v", err))
return
}
// Create metrics buffer
var metrics strings.Builder
// Add Cozystack info metric
if len(nodeList.Items) > 0 {
k8sVersion, _ := c.discoveryClient.ServerVersion()
metrics.WriteString(fmt.Sprintf(
"cozy_cluster_info{cozystack_version=\"%s\",kubernetes_version=\"%s\",oidc_enabled=\"%s\",bundle_name=\"%s\",bunde_enable=\"%s\",bunde_disable=\"%s\"} 1\n",
c.config.CozystackVersion,
k8sVersion,
oidcEnabled,
bundle,
bundleEnable,
bundleDisable,
))
}
// Collect node metrics
nodeOSCount := make(map[string]int)
for _, node := range nodeList.Items {
key := fmt.Sprintf("%s (%s)", node.Status.NodeInfo.OperatingSystem, node.Status.NodeInfo.OSImage)
nodeOSCount[key] = nodeOSCount[key] + 1
}
for osKey, count := range nodeOSCount {
metrics.WriteString(fmt.Sprintf(
"cozy_nodes_count{os=\"%s\",kernel=\"%s\"} %d\n",
osKey,
nodeList.Items[0].Status.NodeInfo.KernelVersion,
count,
))
}
// Collect LoadBalancer services metrics
var serviceList corev1.ServiceList
if err := c.client.List(ctx, &serviceList); err != nil {
logger.Info(fmt.Sprintf("Failed to list Services: %v", err))
} else {
lbCount := 0
for _, svc := range serviceList.Items {
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
lbCount++
}
}
metrics.WriteString(fmt.Sprintf("cozy_loadbalancers_count %d\n", lbCount))
}
// Count tenant namespaces
var nsList corev1.NamespaceList
if err := c.client.List(ctx, &nsList); err != nil {
logger.Info(fmt.Sprintf("Failed to list Namespaces: %v", err))
} else {
tenantCount := 0
for _, ns := range nsList.Items {
if strings.HasPrefix(ns.Name, "tenant-") {
tenantCount++
}
}
metrics.WriteString(fmt.Sprintf("cozy_tenants_count %d\n", tenantCount))
}
// Collect PV metrics grouped by driver and size
var pvList corev1.PersistentVolumeList
if err := c.client.List(ctx, &pvList); err != nil {
logger.Info(fmt.Sprintf("Failed to list PVs: %v", err))
} else {
// Map to store counts by size and driver
pvMetrics := make(map[string]map[string]int)
for _, pv := range pvList.Items {
if capacity, ok := pv.Spec.Capacity[corev1.ResourceStorage]; ok {
sizeGroup := getSizeGroup(capacity)
// Get the CSI driver name
driver := "unknown"
if pv.Spec.CSI != nil {
driver = pv.Spec.CSI.Driver
} else if pv.Spec.HostPath != nil {
driver = "hostpath"
} else if pv.Spec.NFS != nil {
driver = "nfs"
}
// Initialize nested map if needed
if _, exists := pvMetrics[sizeGroup]; !exists {
pvMetrics[sizeGroup] = make(map[string]int)
}
// Increment count for this size/driver combination
pvMetrics[sizeGroup][driver]++
}
}
// Write metrics
for size, drivers := range pvMetrics {
for driver, count := range drivers {
metrics.WriteString(fmt.Sprintf(
"cozy_pvs_count{driver=\"%s\",size=\"%s\"} %d\n",
driver,
size,
count,
))
}
}
}
// Collect workload metrics
var monitorList cozyv1alpha1.WorkloadMonitorList
if err := c.client.List(ctx, &monitorList); err != nil {
logger.Info(fmt.Sprintf("Failed to list WorkloadMonitors: %v", err))
return
}
for _, monitor := range monitorList.Items {
metrics.WriteString(fmt.Sprintf(
"cozy_workloads_count{uid=\"%s\",kind=\"%s\",type=\"%s\",version=\"%s\"} %d\n",
monitor.UID,
monitor.Spec.Kind,
monitor.Spec.Type,
monitor.Spec.Version,
monitor.Status.ObservedReplicas,
))
}
// Send metrics
if err := c.sendMetrics(clusterID, metrics.String()); err != nil {
logger.Info(fmt.Sprintf("Failed to send metrics: %v", err))
}
}
// sendMetrics sends collected metrics to the configured endpoint
func (c *Collector) sendMetrics(clusterID, metrics string) error {
req, err := http.NewRequest("POST", c.config.Endpoint, bytes.NewBufferString(metrics))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "text/plain")
req.Header.Set("X-Cluster-ID", clusterID)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}