mirror of
https://github.com/outbackdingo/cozystack.git
synced 2026-02-05 08:17:59 +00:00
* Count Workload resources for pods by requests, not limits
* Do not count init container requests
* Prefix Workloads for pods with `pod-`, just like the other types to
prevent possible name collisions (closes #787)
The previous version of the WorkloadMonitor controller incorrectly
summed resource limits on pods, rather than requests. This prevented it
from tracking the resource allocation for pods, which only had requests
specified, which is particularly the case for kubevirt's virtual machine
pods. Additionally, it counted the limits for all containers, including
init containers, which are short-lived and do not contribute much to the
total resource usage.
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
- **Bug Fixes**
- Improved handling of workloads with unrecognized prefixes by ensuring
they are properly deleted and not processed further.
- Corrected resource aggregation for Pods to sum container resource
requests instead of limits, and now only includes normal containers.
- **New Features**
- Added support for monitoring workloads with names prefixed by "pod-".
- **Tests**
- Introduced unit tests to verify correct handling of workload name
prefixes and monitored object creation.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
(cherry picked from commit 1e59e5fbb6)
Signed-off-by: Timofei Larkin <lllamnyp@gmail.com>
422 lines
13 KiB
Go
422 lines
13 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/utils/pointer"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
|
|
)
|
|
|
|
// WorkloadMonitorReconciler reconciles a WorkloadMonitor object
|
|
type WorkloadMonitorReconciler struct {
|
|
client.Client
|
|
Scheme *runtime.Scheme
|
|
}
|
|
|
|
// +kubebuilder:rbac:groups=cozystack.io,resources=workloadmonitors,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=cozystack.io,resources=workloadmonitors/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=cozystack.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=cozystack.io,resources=workloads/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
|
|
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch
|
|
|
|
// isServiceReady checks if the service has an external IP bound
|
|
func (r *WorkloadMonitorReconciler) isServiceReady(svc *corev1.Service) bool {
|
|
return len(svc.Status.LoadBalancer.Ingress) > 0
|
|
}
|
|
|
|
// isPVCReady checks if the PVC is bound
|
|
func (r *WorkloadMonitorReconciler) isPVCReady(pvc *corev1.PersistentVolumeClaim) bool {
|
|
return pvc.Status.Phase == corev1.ClaimBound
|
|
}
|
|
|
|
// isPodReady checks if the Pod is in the Ready condition.
|
|
func (r *WorkloadMonitorReconciler) isPodReady(pod *corev1.Pod) bool {
|
|
for _, c := range pod.Status.Conditions {
|
|
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// updateOwnerReferences adds the given monitor as a new owner reference to the object if not already present.
|
|
// It then sorts the owner references to enforce a consistent order.
|
|
func updateOwnerReferences(obj metav1.Object, monitor client.Object) {
|
|
// Retrieve current owner references
|
|
owners := obj.GetOwnerReferences()
|
|
|
|
// Check if current monitor is already in owner references
|
|
var alreadyOwned bool
|
|
for _, ownerRef := range owners {
|
|
if ownerRef.UID == monitor.GetUID() {
|
|
alreadyOwned = true
|
|
break
|
|
}
|
|
}
|
|
|
|
runtimeObj, ok := monitor.(runtime.Object)
|
|
if !ok {
|
|
return
|
|
}
|
|
gvk := runtimeObj.GetObjectKind().GroupVersionKind()
|
|
|
|
// If not already present, add new owner reference without controller flag
|
|
if !alreadyOwned {
|
|
newOwnerRef := metav1.OwnerReference{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Name: monitor.GetName(),
|
|
UID: monitor.GetUID(),
|
|
// Set Controller to false to avoid conflict as multiple controllers are not allowed
|
|
Controller: pointer.BoolPtr(false),
|
|
BlockOwnerDeletion: pointer.BoolPtr(true),
|
|
}
|
|
owners = append(owners, newOwnerRef)
|
|
}
|
|
|
|
// Sort owner references to enforce a consistent order by UID
|
|
sort.SliceStable(owners, func(i, j int) bool {
|
|
return owners[i].UID < owners[j].UID
|
|
})
|
|
|
|
// Update the owner references of the object
|
|
obj.SetOwnerReferences(owners)
|
|
}
|
|
|
|
// reconcileServiceForMonitor creates or updates a Workload object for the given Service and WorkloadMonitor.
|
|
func (r *WorkloadMonitorReconciler) reconcileServiceForMonitor(
|
|
ctx context.Context,
|
|
monitor *cozyv1alpha1.WorkloadMonitor,
|
|
svc corev1.Service,
|
|
) error {
|
|
logger := log.FromContext(ctx)
|
|
workload := &cozyv1alpha1.Workload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("svc-%s", svc.Name),
|
|
Namespace: svc.Namespace,
|
|
},
|
|
}
|
|
|
|
resources := make(map[string]resource.Quantity)
|
|
|
|
q := resource.MustParse("0")
|
|
|
|
for _, ing := range svc.Status.LoadBalancer.Ingress {
|
|
if ing.IP != "" {
|
|
q.Add(resource.MustParse("1"))
|
|
}
|
|
}
|
|
|
|
resources["public-ips"] = q
|
|
|
|
_, err := ctrl.CreateOrUpdate(ctx, r.Client, workload, func() error {
|
|
// Update owner references with the new monitor
|
|
updateOwnerReferences(workload.GetObjectMeta(), monitor)
|
|
|
|
workload.Labels = svc.Labels
|
|
|
|
// Fill Workload status fields:
|
|
workload.Status.Kind = monitor.Spec.Kind
|
|
workload.Status.Type = monitor.Spec.Type
|
|
workload.Status.Resources = resources
|
|
workload.Status.Operational = r.isServiceReady(&svc)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logger.Error(err, "Failed to CreateOrUpdate Workload", "workload", workload.Name)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reconcilePVCForMonitor creates or updates a Workload object for the given PVC and WorkloadMonitor.
|
|
func (r *WorkloadMonitorReconciler) reconcilePVCForMonitor(
|
|
ctx context.Context,
|
|
monitor *cozyv1alpha1.WorkloadMonitor,
|
|
pvc corev1.PersistentVolumeClaim,
|
|
) error {
|
|
logger := log.FromContext(ctx)
|
|
workload := &cozyv1alpha1.Workload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("pvc-%s", pvc.Name),
|
|
Namespace: pvc.Namespace,
|
|
},
|
|
}
|
|
|
|
resources := make(map[string]resource.Quantity)
|
|
|
|
for resourceName, resourceQuantity := range pvc.Status.Capacity {
|
|
resources[resourceName.String()] = resourceQuantity
|
|
}
|
|
|
|
_, err := ctrl.CreateOrUpdate(ctx, r.Client, workload, func() error {
|
|
// Update owner references with the new monitor
|
|
updateOwnerReferences(workload.GetObjectMeta(), monitor)
|
|
|
|
workload.Labels = pvc.Labels
|
|
|
|
// Fill Workload status fields:
|
|
workload.Status.Kind = monitor.Spec.Kind
|
|
workload.Status.Type = monitor.Spec.Type
|
|
workload.Status.Resources = resources
|
|
workload.Status.Operational = r.isPVCReady(&pvc)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logger.Error(err, "Failed to CreateOrUpdate Workload", "workload", workload.Name)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reconcilePodForMonitor creates or updates a Workload object for the given Pod and WorkloadMonitor.
|
|
func (r *WorkloadMonitorReconciler) reconcilePodForMonitor(
|
|
ctx context.Context,
|
|
monitor *cozyv1alpha1.WorkloadMonitor,
|
|
pod corev1.Pod,
|
|
) error {
|
|
logger := log.FromContext(ctx)
|
|
|
|
// totalResources will store the sum of all container resource requests
|
|
totalResources := make(map[string]resource.Quantity)
|
|
|
|
// Iterate over all containers to aggregate their requests
|
|
for _, container := range pod.Spec.Containers {
|
|
for name, qty := range container.Resources.Requests {
|
|
if existing, exists := totalResources[name.String()]; exists {
|
|
existing.Add(qty)
|
|
totalResources[name.String()] = existing
|
|
} else {
|
|
totalResources[name.String()] = qty.DeepCopy()
|
|
}
|
|
}
|
|
}
|
|
|
|
// If annotation "workload.cozystack.io/resources" is present, parse and merge
|
|
if resourcesStr, ok := pod.Annotations["workload.cozystack.io/resources"]; ok {
|
|
annRes := map[string]string{}
|
|
if err := json.Unmarshal([]byte(resourcesStr), &annRes); err != nil {
|
|
logger.Error(err, "Failed to parse resources annotation", "pod", pod.Name)
|
|
} else {
|
|
for k, v := range annRes {
|
|
parsed, err := resource.ParseQuantity(v)
|
|
if err != nil {
|
|
logger.Error(err, "Failed to parse resource quantity from annotation", "key", k, "value", v)
|
|
continue
|
|
}
|
|
totalResources[k] = parsed
|
|
}
|
|
}
|
|
}
|
|
|
|
workload := &cozyv1alpha1.Workload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("pod-%s", pod.Name),
|
|
Namespace: pod.Namespace,
|
|
},
|
|
}
|
|
|
|
_, err := ctrl.CreateOrUpdate(ctx, r.Client, workload, func() error {
|
|
// Update owner references with the new monitor
|
|
updateOwnerReferences(workload.GetObjectMeta(), monitor)
|
|
|
|
// Copy labels from the Pod if needed
|
|
workload.Labels = pod.Labels
|
|
|
|
// Fill Workload status fields:
|
|
workload.Status.Kind = monitor.Spec.Kind
|
|
workload.Status.Type = monitor.Spec.Type
|
|
workload.Status.Resources = totalResources
|
|
workload.Status.Operational = r.isPodReady(&pod)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logger.Error(err, "Failed to CreateOrUpdate Workload", "workload", workload.Name)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Reconcile is the main reconcile loop.
|
|
// 1. It reconciles WorkloadMonitor objects themselves (create/update/delete).
|
|
// 2. It also reconciles Pod events mapped to WorkloadMonitor via label selector.
|
|
func (r *WorkloadMonitorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
// Fetch the WorkloadMonitor object if it exists
|
|
monitor := &cozyv1alpha1.WorkloadMonitor{}
|
|
err := r.Get(ctx, req.NamespacedName, monitor)
|
|
if err != nil {
|
|
// If the resource is not found, it may be a Pod event (mapFunc).
|
|
if apierrors.IsNotFound(err) {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
logger.Error(err, "Unable to fetch WorkloadMonitor")
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// List Pods that match the WorkloadMonitor's selector
|
|
podList := &corev1.PodList{}
|
|
if err := r.List(
|
|
ctx,
|
|
podList,
|
|
client.InNamespace(monitor.Namespace),
|
|
client.MatchingLabels(monitor.Spec.Selector),
|
|
); err != nil {
|
|
logger.Error(err, "Unable to list Pods for WorkloadMonitor", "monitor", monitor.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
var observedReplicas, availableReplicas int32
|
|
|
|
// For each matching Pod, reconcile the corresponding Workload
|
|
for _, pod := range podList.Items {
|
|
observedReplicas++
|
|
if err := r.reconcilePodForMonitor(ctx, monitor, pod); err != nil {
|
|
logger.Error(err, "Failed to reconcile Workload for Pod", "pod", pod.Name)
|
|
continue
|
|
}
|
|
if r.isPodReady(&pod) {
|
|
availableReplicas++
|
|
}
|
|
}
|
|
|
|
pvcList := &corev1.PersistentVolumeClaimList{}
|
|
if err := r.List(
|
|
ctx,
|
|
pvcList,
|
|
client.InNamespace(monitor.Namespace),
|
|
client.MatchingLabels(monitor.Spec.Selector),
|
|
); err != nil {
|
|
logger.Error(err, "Unable to list PVCs for WorkloadMonitor", "monitor", monitor.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
for _, pvc := range pvcList.Items {
|
|
if err := r.reconcilePVCForMonitor(ctx, monitor, pvc); err != nil {
|
|
logger.Error(err, "Failed to reconcile Workload for PVC", "PVC", pvc.Name)
|
|
continue
|
|
}
|
|
}
|
|
|
|
svcList := &corev1.ServiceList{}
|
|
if err := r.List(
|
|
ctx,
|
|
svcList,
|
|
client.InNamespace(monitor.Namespace),
|
|
client.MatchingLabels(monitor.Spec.Selector),
|
|
); err != nil {
|
|
logger.Error(err, "Unable to list Services for WorkloadMonitor", "monitor", monitor.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
for _, svc := range svcList.Items {
|
|
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
|
|
continue
|
|
}
|
|
if err := r.reconcileServiceForMonitor(ctx, monitor, svc); err != nil {
|
|
logger.Error(err, "Failed to reconcile Workload for Service", "Service", svc.Name)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Update WorkloadMonitor status based on observed pods
|
|
monitor.Status.ObservedReplicas = observedReplicas
|
|
monitor.Status.AvailableReplicas = availableReplicas
|
|
|
|
// Default to operational = true, but check MinReplicas if set
|
|
monitor.Status.Operational = pointer.Bool(true)
|
|
if monitor.Spec.MinReplicas != nil && availableReplicas < *monitor.Spec.MinReplicas {
|
|
monitor.Status.Operational = pointer.Bool(false)
|
|
}
|
|
|
|
// Update the WorkloadMonitor status in the cluster
|
|
if err := r.Status().Update(ctx, monitor); err != nil {
|
|
logger.Error(err, "Unable to update WorkloadMonitor status", "monitor", monitor.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Return without requeue if we want purely event-driven reconciliations
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// SetupWithManager registers our controller with the Manager and sets up watches.
|
|
func (r *WorkloadMonitorReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
// Watch WorkloadMonitor objects
|
|
For(&cozyv1alpha1.WorkloadMonitor{}).
|
|
// Also watch Pod objects and map them back to WorkloadMonitor if labels match
|
|
Watches(
|
|
&corev1.Pod{},
|
|
handler.EnqueueRequestsFromMapFunc(mapObjectToMonitor(&corev1.Pod{}, r.Client)),
|
|
).
|
|
// Watch PVCs as well
|
|
Watches(
|
|
&corev1.PersistentVolumeClaim{},
|
|
handler.EnqueueRequestsFromMapFunc(mapObjectToMonitor(&corev1.PersistentVolumeClaim{}, r.Client)),
|
|
).
|
|
// Watch for changes to Workload objects we create (owned by WorkloadMonitor)
|
|
Owns(&cozyv1alpha1.Workload{}).
|
|
Complete(r)
|
|
}
|
|
|
|
func mapObjectToMonitor[T client.Object](_ T, c client.Client) func(ctx context.Context, obj client.Object) []reconcile.Request {
|
|
return func(ctx context.Context, obj client.Object) []reconcile.Request {
|
|
concrete, ok := obj.(T)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
var monitorList cozyv1alpha1.WorkloadMonitorList
|
|
// List all WorkloadMonitors in the same namespace
|
|
if err := c.List(ctx, &monitorList, client.InNamespace(concrete.GetNamespace())); err != nil {
|
|
return nil
|
|
}
|
|
|
|
labels := concrete.GetLabels()
|
|
// Match each monitor's selector with the Pod's labels
|
|
var requests []reconcile.Request
|
|
for _, m := range monitorList.Items {
|
|
matches := true
|
|
for k, v := range m.Spec.Selector {
|
|
if labelVal, exists := labels[k]; !exists || labelVal != v {
|
|
matches = false
|
|
break
|
|
}
|
|
}
|
|
if matches {
|
|
requests = append(requests, reconcile.Request{
|
|
NamespacedName: types.NamespacedName{
|
|
Namespace: m.Namespace,
|
|
Name: m.Name,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
return requests
|
|
}
|
|
}
|