Files
cozystack/internal/controller/workloadmonitor_controller.go
Timofei Larkin e947805c15 Track PVCs with WorkloadMonitor
Signed-off-by: Timofei Larkin <lllamnyp@gmail.com>
2025-04-08 11:44:36 +03:00

351 lines
11 KiB
Go

package controller
import (
"context"
"encoding/json"
"fmt"
"sort"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
)
// WorkloadMonitorReconciler reconciles a WorkloadMonitor object
type WorkloadMonitorReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=cozystack.io,resources=workloadmonitors,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cozystack.io,resources=workloadmonitors/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cozystack.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cozystack.io,resources=workloads/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch
// isPVCReady checks if the PVC is bound
func (r *WorkloadMonitorReconciler) isPVCReady(pvc *corev1.PersistentVolumeClaim) bool {
return pvc.Status.Phase == corev1.ClaimBound
}
// isPodReady checks if the Pod is in the Ready condition.
func (r *WorkloadMonitorReconciler) isPodReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
return true
}
}
return false
}
// updateOwnerReferences adds the given monitor as a new owner reference to the object if not already present.
// It then sorts the owner references to enforce a consistent order.
func updateOwnerReferences(obj metav1.Object, monitor client.Object) {
// Retrieve current owner references
owners := obj.GetOwnerReferences()
// Check if current monitor is already in owner references
var alreadyOwned bool
for _, ownerRef := range owners {
if ownerRef.UID == monitor.GetUID() {
alreadyOwned = true
break
}
}
runtimeObj, ok := monitor.(runtime.Object)
if !ok {
return
}
gvk := runtimeObj.GetObjectKind().GroupVersionKind()
// If not already present, add new owner reference without controller flag
if !alreadyOwned {
newOwnerRef := metav1.OwnerReference{
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: monitor.GetName(),
UID: monitor.GetUID(),
// Set Controller to false to avoid conflict as multiple controllers are not allowed
Controller: pointer.BoolPtr(false),
BlockOwnerDeletion: pointer.BoolPtr(true),
}
owners = append(owners, newOwnerRef)
}
// Sort owner references to enforce a consistent order by UID
sort.SliceStable(owners, func(i, j int) bool {
return owners[i].UID < owners[j].UID
})
// Update the owner references of the object
obj.SetOwnerReferences(owners)
}
// reconcilePVCForMonitor creates or updates a Workload object for the given PVC and WorkloadMonitor.
func (r *WorkloadMonitorReconciler) reconcilePVCForMonitor(
ctx context.Context,
monitor *cozyv1alpha1.WorkloadMonitor,
pvc corev1.PersistentVolumeClaim,
) error {
logger := log.FromContext(ctx)
workload := &cozyv1alpha1.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pvc-%s", pvc.Name),
Namespace: pvc.Namespace,
},
}
resources := make(map[string]resource.Quantity)
for resourceName, resourceQuantity := range pvc.Status.Capacity {
resources[resourceName.String()] = resourceQuantity
}
_, err := ctrl.CreateOrUpdate(ctx, r.Client, workload, func() error {
// Update owner references with the new monitor
updateOwnerReferences(workload.GetObjectMeta(), monitor)
workload.Labels = pvc.Labels
// Fill Workload status fields:
workload.Status.Kind = monitor.Spec.Kind
workload.Status.Type = monitor.Spec.Type
workload.Status.Resources = resources
workload.Status.Operational = r.isPVCReady(&pvc)
return nil
})
if err != nil {
logger.Error(err, "Failed to CreateOrUpdate Workload", "workload", workload.Name)
return err
}
return nil
}
// reconcilePodForMonitor creates or updates a Workload object for the given Pod and WorkloadMonitor.
func (r *WorkloadMonitorReconciler) reconcilePodForMonitor(
ctx context.Context,
monitor *cozyv1alpha1.WorkloadMonitor,
pod corev1.Pod,
) error {
logger := log.FromContext(ctx)
// Combine both init containers and normal containers to sum resources properly
combinedContainers := append(pod.Spec.InitContainers, pod.Spec.Containers...)
// totalResources will store the sum of all container resource limits
totalResources := make(map[string]resource.Quantity)
// Iterate over all containers to aggregate their Limits
for _, container := range combinedContainers {
for name, qty := range container.Resources.Limits {
if existing, exists := totalResources[name.String()]; exists {
existing.Add(qty)
totalResources[name.String()] = existing
} else {
totalResources[name.String()] = qty.DeepCopy()
}
}
}
// If annotation "workload.cozystack.io/resources" is present, parse and merge
if resourcesStr, ok := pod.Annotations["workload.cozystack.io/resources"]; ok {
annRes := map[string]string{}
if err := json.Unmarshal([]byte(resourcesStr), &annRes); err != nil {
logger.Error(err, "Failed to parse resources annotation", "pod", pod.Name)
} else {
for k, v := range annRes {
parsed, err := resource.ParseQuantity(v)
if err != nil {
logger.Error(err, "Failed to parse resource quantity from annotation", "key", k, "value", v)
continue
}
totalResources[k] = parsed
}
}
}
workload := &cozyv1alpha1.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
_, err := ctrl.CreateOrUpdate(ctx, r.Client, workload, func() error {
// Update owner references with the new monitor
updateOwnerReferences(workload.GetObjectMeta(), monitor)
// Copy labels from the Pod if needed
workload.Labels = pod.Labels
// Fill Workload status fields:
workload.Status.Kind = monitor.Spec.Kind
workload.Status.Type = monitor.Spec.Type
workload.Status.Resources = totalResources
workload.Status.Operational = r.isPodReady(&pod)
return nil
})
if err != nil {
logger.Error(err, "Failed to CreateOrUpdate Workload", "workload", workload.Name)
return err
}
return nil
}
// Reconcile is the main reconcile loop.
// 1. It reconciles WorkloadMonitor objects themselves (create/update/delete).
// 2. It also reconciles Pod events mapped to WorkloadMonitor via label selector.
func (r *WorkloadMonitorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// Fetch the WorkloadMonitor object if it exists
monitor := &cozyv1alpha1.WorkloadMonitor{}
err := r.Get(ctx, req.NamespacedName, monitor)
if err != nil {
// If the resource is not found, it may be a Pod event (mapFunc).
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
logger.Error(err, "Unable to fetch WorkloadMonitor")
return ctrl.Result{}, err
}
// List Pods that match the WorkloadMonitor's selector
podList := &corev1.PodList{}
if err := r.List(
ctx,
podList,
client.InNamespace(monitor.Namespace),
client.MatchingLabels(monitor.Spec.Selector),
); err != nil {
logger.Error(err, "Unable to list Pods for WorkloadMonitor", "monitor", monitor.Name)
return ctrl.Result{}, err
}
var observedReplicas, availableReplicas int32
// For each matching Pod, reconcile the corresponding Workload
for _, pod := range podList.Items {
observedReplicas++
if err := r.reconcilePodForMonitor(ctx, monitor, pod); err != nil {
logger.Error(err, "Failed to reconcile Workload for Pod", "pod", pod.Name)
continue
}
if r.isPodReady(&pod) {
availableReplicas++
}
}
pvcList := &corev1.PersistentVolumeClaimList{}
if err := r.List(
ctx,
pvcList,
client.InNamespace(monitor.Namespace),
client.MatchingLabels(monitor.Spec.Selector),
); err != nil {
logger.Error(err, "Unable to list PVCs for WorkloadMonitor", "monitor", monitor.Name)
return ctrl.Result{}, err
}
for _, pvc := range pvcList.Items {
if err := r.reconcilePVCForMonitor(ctx, monitor, pvc); err != nil {
logger.Error(err, "Failed to reconcile Workload for PVC", "PVC", pvc.Name)
continue
}
}
// Update WorkloadMonitor status based on observed pods
monitor.Status.ObservedReplicas = observedReplicas
monitor.Status.AvailableReplicas = availableReplicas
// Default to operational = true, but check MinReplicas if set
monitor.Status.Operational = pointer.Bool(true)
if monitor.Spec.MinReplicas != nil && availableReplicas < *monitor.Spec.MinReplicas {
monitor.Status.Operational = pointer.Bool(false)
}
// Update the WorkloadMonitor status in the cluster
if err := r.Status().Update(ctx, monitor); err != nil {
logger.Error(err, "Unable to update WorkloadMonitor status", "monitor", monitor.Name)
return ctrl.Result{}, err
}
// Return without requeue if we want purely event-driven reconciliations
return ctrl.Result{}, nil
}
// SetupWithManager registers our controller with the Manager and sets up watches.
func (r *WorkloadMonitorReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Watch WorkloadMonitor objects
For(&cozyv1alpha1.WorkloadMonitor{}).
// Also watch Pod objects and map them back to WorkloadMonitor if labels match
Watches(
&corev1.Pod{},
handler.EnqueueRequestsFromMapFunc(mapObjectToMonitor(&corev1.Pod{}, r.Client)),
).
// Watch PVCs as well
Watches(
&corev1.PersistentVolumeClaim{},
handler.EnqueueRequestsFromMapFunc(mapObjectToMonitor(&corev1.PersistentVolumeClaim{}, r.Client)),
).
// Watch for changes to Workload objects we create (owned by WorkloadMonitor)
Owns(&cozyv1alpha1.Workload{}).
Complete(r)
}
func mapObjectToMonitor[T client.Object](_ T, c client.Client) func(ctx context.Context, obj client.Object) []reconcile.Request {
return func(ctx context.Context, obj client.Object) []reconcile.Request {
concrete, ok := obj.(T)
if !ok {
return nil
}
var monitorList cozyv1alpha1.WorkloadMonitorList
// List all WorkloadMonitors in the same namespace
if err := c.List(ctx, &monitorList, client.InNamespace(concrete.GetNamespace())); err != nil {
return nil
}
labels := concrete.GetLabels()
// Match each monitor's selector with the Pod's labels
var requests []reconcile.Request
for _, m := range monitorList.Items {
matches := true
for k, v := range m.Spec.Selector {
if labelVal, exists := labels[k]; !exists || labelVal != v {
matches = false
break
}
}
if matches {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: m.Namespace,
Name: m.Name,
},
})
}
}
return requests
}
}