mirror of
https://github.com/outbackdingo/cozystack.git
synced 2026-01-27 10:18:39 +00:00
293 lines
7.5 KiB
Go
293 lines
7.5 KiB
Go
package telemetry
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/rest"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
|
|
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
|
|
)
|
|
|
|
// Collector handles telemetry data collection and sending
|
|
type Collector struct {
|
|
client client.Client
|
|
discoveryClient discovery.DiscoveryInterface
|
|
config *Config
|
|
ticker *time.Ticker
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// NewCollector creates a new telemetry collector
|
|
func NewCollector(client client.Client, config *Config, kubeConfig *rest.Config) (*Collector, error) {
|
|
discoveryClient, err := discovery.NewDiscoveryClientForConfig(kubeConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create discovery client: %w", err)
|
|
}
|
|
return &Collector{
|
|
client: client,
|
|
discoveryClient: discoveryClient,
|
|
config: config,
|
|
}, nil
|
|
}
|
|
|
|
// Start implements manager.Runnable
|
|
func (c *Collector) Start(ctx context.Context) error {
|
|
if c.config.Disabled {
|
|
return nil
|
|
}
|
|
|
|
c.ticker = time.NewTicker(c.config.Interval)
|
|
c.stopCh = make(chan struct{})
|
|
|
|
// Initial collection
|
|
c.collect(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
c.ticker.Stop()
|
|
close(c.stopCh)
|
|
return nil
|
|
case <-c.ticker.C:
|
|
c.collect(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NeedLeaderElection implements manager.LeaderElectionRunnable
|
|
func (c *Collector) NeedLeaderElection() bool {
|
|
// Only run telemetry collector on the leader
|
|
return true
|
|
}
|
|
|
|
// Stop halts telemetry collection
|
|
func (c *Collector) Stop() {
|
|
close(c.stopCh)
|
|
}
|
|
|
|
// getSizeGroup returns the exponential size group for PVC
|
|
func getSizeGroup(size resource.Quantity) string {
|
|
gb := size.Value() / (1024 * 1024 * 1024)
|
|
switch {
|
|
case gb <= 1:
|
|
return "1Gi"
|
|
case gb <= 5:
|
|
return "5Gi"
|
|
case gb <= 10:
|
|
return "10Gi"
|
|
case gb <= 25:
|
|
return "25Gi"
|
|
case gb <= 50:
|
|
return "50Gi"
|
|
case gb <= 100:
|
|
return "100Gi"
|
|
case gb <= 250:
|
|
return "250Gi"
|
|
case gb <= 500:
|
|
return "500Gi"
|
|
case gb <= 1024:
|
|
return "1Ti"
|
|
case gb <= 2048:
|
|
return "2Ti"
|
|
case gb <= 5120:
|
|
return "5Ti"
|
|
default:
|
|
return "10Ti"
|
|
}
|
|
}
|
|
|
|
// collect gathers and sends telemetry data
|
|
func (c *Collector) collect(ctx context.Context) {
|
|
logger := log.FromContext(ctx).V(1)
|
|
|
|
// Get cluster ID from kube-system namespace
|
|
var kubeSystemNS corev1.Namespace
|
|
if err := c.client.Get(ctx, types.NamespacedName{Name: "kube-system"}, &kubeSystemNS); err != nil {
|
|
logger.Info(fmt.Sprintf("Failed to get kube-system namespace: %v", err))
|
|
return
|
|
}
|
|
|
|
clusterID := string(kubeSystemNS.UID)
|
|
|
|
var cozystackCM corev1.ConfigMap
|
|
if err := c.client.Get(ctx, types.NamespacedName{Namespace: "cozy-system", Name: "cozystack"}, &cozystackCM); err != nil {
|
|
logger.Info(fmt.Sprintf("Failed to get cozystack configmap in cozy-system namespace: %v", err))
|
|
return
|
|
}
|
|
|
|
oidcEnabled := cozystackCM.Data["oidc-enabled"]
|
|
bundle := cozystackCM.Data["bundle-name"]
|
|
bundleEnable := cozystackCM.Data["bundle-enable"]
|
|
bundleDisable := cozystackCM.Data["bundle-disable"]
|
|
|
|
// Get Kubernetes version from nodes
|
|
var nodeList corev1.NodeList
|
|
if err := c.client.List(ctx, &nodeList); err != nil {
|
|
logger.Info(fmt.Sprintf("Failed to list nodes: %v", err))
|
|
return
|
|
}
|
|
|
|
// Create metrics buffer
|
|
var metrics strings.Builder
|
|
|
|
// Add Cozystack info metric
|
|
if len(nodeList.Items) > 0 {
|
|
k8sVersion, _ := c.discoveryClient.ServerVersion()
|
|
metrics.WriteString(fmt.Sprintf(
|
|
"cozy_cluster_info{cozystack_version=\"%s\",kubernetes_version=\"%s\",oidc_enabled=\"%s\",bundle_name=\"%s\",bunde_enable=\"%s\",bunde_disable=\"%s\"} 1\n",
|
|
c.config.CozystackVersion,
|
|
k8sVersion,
|
|
oidcEnabled,
|
|
bundle,
|
|
bundleEnable,
|
|
bundleDisable,
|
|
))
|
|
}
|
|
|
|
// Collect node metrics
|
|
nodeOSCount := make(map[string]int)
|
|
for _, node := range nodeList.Items {
|
|
key := fmt.Sprintf("%s (%s)", node.Status.NodeInfo.OperatingSystem, node.Status.NodeInfo.OSImage)
|
|
nodeOSCount[key] = nodeOSCount[key] + 1
|
|
}
|
|
|
|
for osKey, count := range nodeOSCount {
|
|
metrics.WriteString(fmt.Sprintf(
|
|
"cozy_nodes_count{os=\"%s\",kernel=\"%s\"} %d\n",
|
|
osKey,
|
|
nodeList.Items[0].Status.NodeInfo.KernelVersion,
|
|
count,
|
|
))
|
|
}
|
|
|
|
// Collect LoadBalancer services metrics
|
|
var serviceList corev1.ServiceList
|
|
if err := c.client.List(ctx, &serviceList); err != nil {
|
|
logger.Info(fmt.Sprintf("Failed to list Services: %v", err))
|
|
} else {
|
|
lbCount := 0
|
|
for _, svc := range serviceList.Items {
|
|
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
|
|
lbCount++
|
|
}
|
|
}
|
|
metrics.WriteString(fmt.Sprintf("cozy_loadbalancers_count %d\n", lbCount))
|
|
}
|
|
|
|
// Count tenant namespaces
|
|
var nsList corev1.NamespaceList
|
|
if err := c.client.List(ctx, &nsList); err != nil {
|
|
logger.Info(fmt.Sprintf("Failed to list Namespaces: %v", err))
|
|
} else {
|
|
tenantCount := 0
|
|
for _, ns := range nsList.Items {
|
|
if strings.HasPrefix(ns.Name, "tenant-") {
|
|
tenantCount++
|
|
}
|
|
}
|
|
metrics.WriteString(fmt.Sprintf("cozy_tenants_count %d\n", tenantCount))
|
|
}
|
|
|
|
// Collect PV metrics grouped by driver and size
|
|
var pvList corev1.PersistentVolumeList
|
|
if err := c.client.List(ctx, &pvList); err != nil {
|
|
logger.Info(fmt.Sprintf("Failed to list PVs: %v", err))
|
|
} else {
|
|
// Map to store counts by size and driver
|
|
pvMetrics := make(map[string]map[string]int)
|
|
|
|
for _, pv := range pvList.Items {
|
|
if capacity, ok := pv.Spec.Capacity[corev1.ResourceStorage]; ok {
|
|
sizeGroup := getSizeGroup(capacity)
|
|
|
|
// Get the CSI driver name
|
|
driver := "unknown"
|
|
if pv.Spec.CSI != nil {
|
|
driver = pv.Spec.CSI.Driver
|
|
} else if pv.Spec.HostPath != nil {
|
|
driver = "hostpath"
|
|
} else if pv.Spec.NFS != nil {
|
|
driver = "nfs"
|
|
}
|
|
|
|
// Initialize nested map if needed
|
|
if _, exists := pvMetrics[sizeGroup]; !exists {
|
|
pvMetrics[sizeGroup] = make(map[string]int)
|
|
}
|
|
|
|
// Increment count for this size/driver combination
|
|
pvMetrics[sizeGroup][driver]++
|
|
}
|
|
}
|
|
|
|
// Write metrics
|
|
for size, drivers := range pvMetrics {
|
|
for driver, count := range drivers {
|
|
metrics.WriteString(fmt.Sprintf(
|
|
"cozy_pvs_count{driver=\"%s\",size=\"%s\"} %d\n",
|
|
driver,
|
|
size,
|
|
count,
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Collect workload metrics
|
|
var monitorList cozyv1alpha1.WorkloadMonitorList
|
|
if err := c.client.List(ctx, &monitorList); err != nil {
|
|
logger.Info(fmt.Sprintf("Failed to list WorkloadMonitors: %v", err))
|
|
return
|
|
}
|
|
|
|
for _, monitor := range monitorList.Items {
|
|
metrics.WriteString(fmt.Sprintf(
|
|
"cozy_workloads_count{uid=\"%s\",kind=\"%s\",type=\"%s\",version=\"%s\"} %d\n",
|
|
monitor.UID,
|
|
monitor.Spec.Kind,
|
|
monitor.Spec.Type,
|
|
monitor.Spec.Version,
|
|
monitor.Status.ObservedReplicas,
|
|
))
|
|
}
|
|
|
|
// Send metrics
|
|
if err := c.sendMetrics(clusterID, metrics.String()); err != nil {
|
|
logger.Info(fmt.Sprintf("Failed to send metrics: %v", err))
|
|
}
|
|
}
|
|
|
|
// sendMetrics sends collected metrics to the configured endpoint
|
|
func (c *Collector) sendMetrics(clusterID, metrics string) error {
|
|
req, err := http.NewRequest("POST", c.config.Endpoint, bytes.NewBufferString(metrics))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "text/plain")
|
|
req.Header.Set("X-Cluster-ID", clusterID)
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
|
}
|
|
|
|
return nil
|
|
}
|