mirror of
https://github.com/outbackdingo/cozystack.git
synced 2026-01-27 18:18:41 +00:00
425 lines
13 KiB
Go
425 lines
13 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/utils/pointer"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
|
|
)
|
|
|
|
// WorkloadMonitorReconciler reconciles a WorkloadMonitor object
|
|
type WorkloadMonitorReconciler struct {
|
|
client.Client
|
|
Scheme *runtime.Scheme
|
|
}
|
|
|
|
// +kubebuilder:rbac:groups=cozystack.io,resources=workloadmonitors,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=cozystack.io,resources=workloadmonitors/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=cozystack.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=cozystack.io,resources=workloads/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
|
|
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch
|
|
|
|
// isServiceReady checks if the service has an external IP bound
|
|
func (r *WorkloadMonitorReconciler) isServiceReady(svc *corev1.Service) bool {
|
|
return len(svc.Status.LoadBalancer.Ingress) > 0
|
|
}
|
|
|
|
// isPVCReady checks if the PVC is bound
|
|
func (r *WorkloadMonitorReconciler) isPVCReady(pvc *corev1.PersistentVolumeClaim) bool {
|
|
return pvc.Status.Phase == corev1.ClaimBound
|
|
}
|
|
|
|
// isPodReady checks if the Pod is in the Ready condition.
|
|
func (r *WorkloadMonitorReconciler) isPodReady(pod *corev1.Pod) bool {
|
|
for _, c := range pod.Status.Conditions {
|
|
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// updateOwnerReferences adds the given monitor as a new owner reference to the object if not already present.
|
|
// It then sorts the owner references to enforce a consistent order.
|
|
func updateOwnerReferences(obj metav1.Object, monitor client.Object) {
|
|
// Retrieve current owner references
|
|
owners := obj.GetOwnerReferences()
|
|
|
|
// Check if current monitor is already in owner references
|
|
var alreadyOwned bool
|
|
for _, ownerRef := range owners {
|
|
if ownerRef.UID == monitor.GetUID() {
|
|
alreadyOwned = true
|
|
break
|
|
}
|
|
}
|
|
|
|
runtimeObj, ok := monitor.(runtime.Object)
|
|
if !ok {
|
|
return
|
|
}
|
|
gvk := runtimeObj.GetObjectKind().GroupVersionKind()
|
|
|
|
// If not already present, add new owner reference without controller flag
|
|
if !alreadyOwned {
|
|
newOwnerRef := metav1.OwnerReference{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Name: monitor.GetName(),
|
|
UID: monitor.GetUID(),
|
|
// Set Controller to false to avoid conflict as multiple controllers are not allowed
|
|
Controller: pointer.BoolPtr(false),
|
|
BlockOwnerDeletion: pointer.BoolPtr(true),
|
|
}
|
|
owners = append(owners, newOwnerRef)
|
|
}
|
|
|
|
// Sort owner references to enforce a consistent order by UID
|
|
sort.SliceStable(owners, func(i, j int) bool {
|
|
return owners[i].UID < owners[j].UID
|
|
})
|
|
|
|
// Update the owner references of the object
|
|
obj.SetOwnerReferences(owners)
|
|
}
|
|
|
|
// reconcileServiceForMonitor creates or updates a Workload object for the given Service and WorkloadMonitor.
|
|
func (r *WorkloadMonitorReconciler) reconcileServiceForMonitor(
|
|
ctx context.Context,
|
|
monitor *cozyv1alpha1.WorkloadMonitor,
|
|
svc corev1.Service,
|
|
) error {
|
|
logger := log.FromContext(ctx)
|
|
workload := &cozyv1alpha1.Workload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("svc-%s", svc.Name),
|
|
Namespace: svc.Namespace,
|
|
},
|
|
}
|
|
|
|
resources := make(map[string]resource.Quantity)
|
|
|
|
q := resource.MustParse("0")
|
|
|
|
for _, ing := range svc.Status.LoadBalancer.Ingress {
|
|
if ing.IP != "" {
|
|
q.Add(resource.MustParse("1"))
|
|
}
|
|
}
|
|
|
|
resources["public-ips"] = q
|
|
|
|
_, err := ctrl.CreateOrUpdate(ctx, r.Client, workload, func() error {
|
|
// Update owner references with the new monitor
|
|
updateOwnerReferences(workload.GetObjectMeta(), monitor)
|
|
|
|
workload.Labels = svc.Labels
|
|
|
|
// Fill Workload status fields:
|
|
workload.Status.Kind = monitor.Spec.Kind
|
|
workload.Status.Type = monitor.Spec.Type
|
|
workload.Status.Resources = resources
|
|
workload.Status.Operational = r.isServiceReady(&svc)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logger.Error(err, "Failed to CreateOrUpdate Workload", "workload", workload.Name)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reconcilePVCForMonitor creates or updates a Workload object for the given PVC and WorkloadMonitor.
|
|
func (r *WorkloadMonitorReconciler) reconcilePVCForMonitor(
|
|
ctx context.Context,
|
|
monitor *cozyv1alpha1.WorkloadMonitor,
|
|
pvc corev1.PersistentVolumeClaim,
|
|
) error {
|
|
logger := log.FromContext(ctx)
|
|
workload := &cozyv1alpha1.Workload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: fmt.Sprintf("pvc-%s", pvc.Name),
|
|
Namespace: pvc.Namespace,
|
|
},
|
|
}
|
|
|
|
resources := make(map[string]resource.Quantity)
|
|
|
|
for resourceName, resourceQuantity := range pvc.Status.Capacity {
|
|
resources[resourceName.String()] = resourceQuantity
|
|
}
|
|
|
|
_, err := ctrl.CreateOrUpdate(ctx, r.Client, workload, func() error {
|
|
// Update owner references with the new monitor
|
|
updateOwnerReferences(workload.GetObjectMeta(), monitor)
|
|
|
|
workload.Labels = pvc.Labels
|
|
|
|
// Fill Workload status fields:
|
|
workload.Status.Kind = monitor.Spec.Kind
|
|
workload.Status.Type = monitor.Spec.Type
|
|
workload.Status.Resources = resources
|
|
workload.Status.Operational = r.isPVCReady(&pvc)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logger.Error(err, "Failed to CreateOrUpdate Workload", "workload", workload.Name)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reconcilePodForMonitor creates or updates a Workload object for the given Pod and WorkloadMonitor.
|
|
func (r *WorkloadMonitorReconciler) reconcilePodForMonitor(
|
|
ctx context.Context,
|
|
monitor *cozyv1alpha1.WorkloadMonitor,
|
|
pod corev1.Pod,
|
|
) error {
|
|
logger := log.FromContext(ctx)
|
|
|
|
// Combine both init containers and normal containers to sum resources properly
|
|
combinedContainers := append(pod.Spec.InitContainers, pod.Spec.Containers...)
|
|
|
|
// totalResources will store the sum of all container resource limits
|
|
totalResources := make(map[string]resource.Quantity)
|
|
|
|
// Iterate over all containers to aggregate their Limits
|
|
for _, container := range combinedContainers {
|
|
for name, qty := range container.Resources.Limits {
|
|
if existing, exists := totalResources[name.String()]; exists {
|
|
existing.Add(qty)
|
|
totalResources[name.String()] = existing
|
|
} else {
|
|
totalResources[name.String()] = qty.DeepCopy()
|
|
}
|
|
}
|
|
}
|
|
|
|
// If annotation "workload.cozystack.io/resources" is present, parse and merge
|
|
if resourcesStr, ok := pod.Annotations["workload.cozystack.io/resources"]; ok {
|
|
annRes := map[string]string{}
|
|
if err := json.Unmarshal([]byte(resourcesStr), &annRes); err != nil {
|
|
logger.Error(err, "Failed to parse resources annotation", "pod", pod.Name)
|
|
} else {
|
|
for k, v := range annRes {
|
|
parsed, err := resource.ParseQuantity(v)
|
|
if err != nil {
|
|
logger.Error(err, "Failed to parse resource quantity from annotation", "key", k, "value", v)
|
|
continue
|
|
}
|
|
totalResources[k] = parsed
|
|
}
|
|
}
|
|
}
|
|
|
|
workload := &cozyv1alpha1.Workload{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: pod.Name,
|
|
Namespace: pod.Namespace,
|
|
},
|
|
}
|
|
|
|
_, err := ctrl.CreateOrUpdate(ctx, r.Client, workload, func() error {
|
|
// Update owner references with the new monitor
|
|
updateOwnerReferences(workload.GetObjectMeta(), monitor)
|
|
|
|
// Copy labels from the Pod if needed
|
|
workload.Labels = pod.Labels
|
|
|
|
// Fill Workload status fields:
|
|
workload.Status.Kind = monitor.Spec.Kind
|
|
workload.Status.Type = monitor.Spec.Type
|
|
workload.Status.Resources = totalResources
|
|
workload.Status.Operational = r.isPodReady(&pod)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logger.Error(err, "Failed to CreateOrUpdate Workload", "workload", workload.Name)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Reconcile is the main reconcile loop.
|
|
// 1. It reconciles WorkloadMonitor objects themselves (create/update/delete).
|
|
// 2. It also reconciles Pod events mapped to WorkloadMonitor via label selector.
|
|
func (r *WorkloadMonitorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
|
logger := log.FromContext(ctx)
|
|
|
|
// Fetch the WorkloadMonitor object if it exists
|
|
monitor := &cozyv1alpha1.WorkloadMonitor{}
|
|
err := r.Get(ctx, req.NamespacedName, monitor)
|
|
if err != nil {
|
|
// If the resource is not found, it may be a Pod event (mapFunc).
|
|
if apierrors.IsNotFound(err) {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
logger.Error(err, "Unable to fetch WorkloadMonitor")
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// List Pods that match the WorkloadMonitor's selector
|
|
podList := &corev1.PodList{}
|
|
if err := r.List(
|
|
ctx,
|
|
podList,
|
|
client.InNamespace(monitor.Namespace),
|
|
client.MatchingLabels(monitor.Spec.Selector),
|
|
); err != nil {
|
|
logger.Error(err, "Unable to list Pods for WorkloadMonitor", "monitor", monitor.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
var observedReplicas, availableReplicas int32
|
|
|
|
// For each matching Pod, reconcile the corresponding Workload
|
|
for _, pod := range podList.Items {
|
|
observedReplicas++
|
|
if err := r.reconcilePodForMonitor(ctx, monitor, pod); err != nil {
|
|
logger.Error(err, "Failed to reconcile Workload for Pod", "pod", pod.Name)
|
|
continue
|
|
}
|
|
if r.isPodReady(&pod) {
|
|
availableReplicas++
|
|
}
|
|
}
|
|
|
|
pvcList := &corev1.PersistentVolumeClaimList{}
|
|
if err := r.List(
|
|
ctx,
|
|
pvcList,
|
|
client.InNamespace(monitor.Namespace),
|
|
client.MatchingLabels(monitor.Spec.Selector),
|
|
); err != nil {
|
|
logger.Error(err, "Unable to list PVCs for WorkloadMonitor", "monitor", monitor.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
for _, pvc := range pvcList.Items {
|
|
if err := r.reconcilePVCForMonitor(ctx, monitor, pvc); err != nil {
|
|
logger.Error(err, "Failed to reconcile Workload for PVC", "PVC", pvc.Name)
|
|
continue
|
|
}
|
|
}
|
|
|
|
svcList := &corev1.ServiceList{}
|
|
if err := r.List(
|
|
ctx,
|
|
svcList,
|
|
client.InNamespace(monitor.Namespace),
|
|
client.MatchingLabels(monitor.Spec.Selector),
|
|
); err != nil {
|
|
logger.Error(err, "Unable to list Services for WorkloadMonitor", "monitor", monitor.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
for _, svc := range svcList.Items {
|
|
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
|
|
continue
|
|
}
|
|
if err := r.reconcileServiceForMonitor(ctx, monitor, svc); err != nil {
|
|
logger.Error(err, "Failed to reconcile Workload for Service", "Service", svc.Name)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Update WorkloadMonitor status based on observed pods
|
|
monitor.Status.ObservedReplicas = observedReplicas
|
|
monitor.Status.AvailableReplicas = availableReplicas
|
|
|
|
// Default to operational = true, but check MinReplicas if set
|
|
monitor.Status.Operational = pointer.Bool(true)
|
|
if monitor.Spec.MinReplicas != nil && availableReplicas < *monitor.Spec.MinReplicas {
|
|
monitor.Status.Operational = pointer.Bool(false)
|
|
}
|
|
|
|
// Update the WorkloadMonitor status in the cluster
|
|
if err := r.Status().Update(ctx, monitor); err != nil {
|
|
logger.Error(err, "Unable to update WorkloadMonitor status", "monitor", monitor.Name)
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Return without requeue if we want purely event-driven reconciliations
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// SetupWithManager registers our controller with the Manager and sets up watches.
|
|
func (r *WorkloadMonitorReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
// Watch WorkloadMonitor objects
|
|
For(&cozyv1alpha1.WorkloadMonitor{}).
|
|
// Also watch Pod objects and map them back to WorkloadMonitor if labels match
|
|
Watches(
|
|
&corev1.Pod{},
|
|
handler.EnqueueRequestsFromMapFunc(mapObjectToMonitor(&corev1.Pod{}, r.Client)),
|
|
).
|
|
// Watch PVCs as well
|
|
Watches(
|
|
&corev1.PersistentVolumeClaim{},
|
|
handler.EnqueueRequestsFromMapFunc(mapObjectToMonitor(&corev1.PersistentVolumeClaim{}, r.Client)),
|
|
).
|
|
// Watch for changes to Workload objects we create (owned by WorkloadMonitor)
|
|
Owns(&cozyv1alpha1.Workload{}).
|
|
Complete(r)
|
|
}
|
|
|
|
func mapObjectToMonitor[T client.Object](_ T, c client.Client) func(ctx context.Context, obj client.Object) []reconcile.Request {
|
|
return func(ctx context.Context, obj client.Object) []reconcile.Request {
|
|
concrete, ok := obj.(T)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
var monitorList cozyv1alpha1.WorkloadMonitorList
|
|
// List all WorkloadMonitors in the same namespace
|
|
if err := c.List(ctx, &monitorList, client.InNamespace(concrete.GetNamespace())); err != nil {
|
|
return nil
|
|
}
|
|
|
|
labels := concrete.GetLabels()
|
|
// Match each monitor's selector with the Pod's labels
|
|
var requests []reconcile.Request
|
|
for _, m := range monitorList.Items {
|
|
matches := true
|
|
for k, v := range m.Spec.Selector {
|
|
if labelVal, exists := labels[k]; !exists || labelVal != v {
|
|
matches = false
|
|
break
|
|
}
|
|
}
|
|
if matches {
|
|
requests = append(requests, reconcile.Request{
|
|
NamespacedName: types.NamespacedName{
|
|
Namespace: m.Namespace,
|
|
Name: m.Name,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
return requests
|
|
}
|
|
}
|