Compare commits

...

1 Commits

Author SHA1 Message Date
Andrei Kvapil
ca61c71084 Replace Ancestor tracking webhook with controller
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>
2025-09-17 12:48:17 +02:00
8 changed files with 603 additions and 343 deletions

View File

@@ -38,7 +38,7 @@ import (
cozystackiov1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
"github.com/cozystack/cozystack/internal/controller"
lcw "github.com/cozystack/cozystack/internal/lineagecontrollerwebhook"
"github.com/cozystack/cozystack/internal/controller/lineagelabeler"
"github.com/cozystack/cozystack/internal/telemetry"
helmv2 "github.com/fluxcd/helm-controller/api/v2"
@@ -68,6 +68,7 @@ func main() {
var telemetryEndpoint string
var telemetryInterval string
var cozystackVersion string
var watchResources string
var tlsOpts []func(*tls.Config)
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
@@ -87,6 +88,9 @@ func main() {
"Interval between telemetry data collection (e.g. 15m, 1h)")
flag.StringVar(&cozystackVersion, "cozystack-version", "unknown",
"Version of Cozystack")
flag.StringVar(&watchResources, "watch-resources",
"v1/Pod,v1/Service,v1/Secret,v1/PersistentVolumeClaim",
"Comma-separated list of resources to watch in the form 'group/version/Kind'.")
opts := zap.Options{
Development: false,
}
@@ -215,17 +219,12 @@ func main() {
os.Exit(1)
}
// special one that's both a webhook and a reconciler
lineageControllerWebhook := &lcw.LineageControllerWebhook{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
if err := lineageControllerWebhook.SetupWithManagerAsController(mgr); err != nil {
setupLog.Error(err, "unable to setup controller", "controller", "LineageController")
os.Exit(1)
}
if err := lineageControllerWebhook.SetupWithManagerAsWebhook(mgr); err != nil {
setupLog.Error(err, "unable to setup webhook", "webhook", "LineageWebhook")
if err := (&lineagelabeler.LineageLabelerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
WatchResourceCSV: watchResources,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LineageLabeler")
os.Exit(1)
}

View File

@@ -2,14 +2,25 @@ package controller
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"sort"
"sync"
"time"
"github.com/cozystack/cozystack/internal/shared/crdmem"
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -20,85 +31,55 @@ type CozystackResourceDefinitionReconciler struct {
client.Client
Scheme *runtime.Scheme
// Configurable debounce duration
Debounce time.Duration
// Internal state for debouncing
mu sync.Mutex
lastEvent time.Time // Time of last CRUD event on CozystackResourceDefinition
lastHandled time.Time // Last time the Deployment was actually restarted
lastEvent time.Time
lastHandled time.Time
mem *crdmem.Memory
}
// Reconcile handles the logic to restart the target Deployment only once,
// even if multiple events occur close together
func (r *CozystackResourceDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
logger := log.FromContext(ctx)
// Only respond to our target deployment
if req.Namespace != "cozy-system" || req.Name != "cozystack-api" {
crd := &cozyv1alpha1.CozystackResourceDefinition{}
err := r.Get(ctx, types.NamespacedName{Name: req.Name}, crd)
if err == nil {
if r.mem != nil {
r.mem.Upsert(crd)
}
r.mu.Lock()
r.lastEvent = time.Now()
r.mu.Unlock()
return ctrl.Result{}, nil
}
r.mu.Lock()
le := r.lastEvent
lh := r.lastHandled
debounce := r.Debounce
r.mu.Unlock()
if debounce <= 0 {
debounce = 5 * time.Second
}
// No events received yet — nothing to do
if le.IsZero() {
return ctrl.Result{}, nil
}
// Wait until the debounce duration has passed since the last event
if d := time.Since(le); d < debounce {
return ctrl.Result{RequeueAfter: debounce - d}, nil
}
// Already handled this event — skip restart
if !lh.Before(le) {
return ctrl.Result{}, nil
}
// Perform the restart by patching the deployment annotation
deploy := &appsv1.Deployment{}
if err := r.Get(ctx, types.NamespacedName{Namespace: "cozy-system", Name: "cozystack-api"}, deploy); err != nil {
log.Error(err, "Failed to get Deployment cozy-system/cozystack-api")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
patch := client.MergeFrom(deploy.DeepCopy())
if deploy.Spec.Template.Annotations == nil {
deploy.Spec.Template.Annotations = make(map[string]string)
}
deploy.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
if err := r.Patch(ctx, deploy, patch); err != nil {
log.Error(err, "Failed to patch Deployment annotation")
if err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
// Mark this event as handled
r.mu.Lock()
r.lastHandled = le
r.mu.Unlock()
log.Info("Deployment cozy-system/cozystack-api successfully restarted")
if apierrors.IsNotFound(err) && r.mem != nil {
r.mem.Delete(req.Name)
}
if req.Namespace == "cozy-system" && req.Name == "cozystack-api" {
return r.debouncedRestart(ctx, logger)
}
return ctrl.Result{}, nil
}
// SetupWithManager configures how the controller listens to events
func (r *CozystackResourceDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error {
if r.Debounce == 0 {
r.Debounce = 5 * time.Second
}
if r.mem == nil {
r.mem = crdmem.Global()
}
if err := r.mem.EnsurePrimingWithManager(mgr); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
Named("cozystack-restart-controller").
Named("cozystackresource-controller").
For(&cozyv1alpha1.CozystackResourceDefinition{}, builder.WithPredicates()).
Watches(
&cozyv1alpha1.CozystackResourceDefinition{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
@@ -115,3 +96,88 @@ func (r *CozystackResourceDefinitionReconciler) SetupWithManager(mgr ctrl.Manage
).
Complete(r)
}
type crdHashView struct {
Name string `json:"name"`
Spec cozyv1alpha1.CozystackResourceDefinitionSpec `json:"spec"`
}
func (r *CozystackResourceDefinitionReconciler) computeConfigHash() (string, error) {
if r.mem == nil {
return "", nil
}
snapshot := r.mem.Snapshot()
sort.Slice(snapshot, func(i, j int) bool { return snapshot[i].Name < snapshot[j].Name })
views := make([]crdHashView, 0, len(snapshot))
for i := range snapshot {
views = append(views, crdHashView{
Name: snapshot[i].Name,
Spec: snapshot[i].Spec,
})
}
b, err := json.Marshal(views)
if err != nil {
return "", err
}
sum := sha256.Sum256(b)
return hex.EncodeToString(sum[:]), nil
}
func (r *CozystackResourceDefinitionReconciler) debouncedRestart(ctx context.Context, logger logr.Logger) (ctrl.Result, error) {
r.mu.Lock()
le := r.lastEvent
lh := r.lastHandled
debounce := r.Debounce
r.mu.Unlock()
if debounce <= 0 {
debounce = 5 * time.Second
}
if le.IsZero() {
return ctrl.Result{}, nil
}
if d := time.Since(le); d < debounce {
return ctrl.Result{RequeueAfter: debounce - d}, nil
}
if !lh.Before(le) {
return ctrl.Result{}, nil
}
newHash, err := r.computeConfigHash()
if err != nil {
return ctrl.Result{}, err
}
deploy := &appsv1.Deployment{}
if err := r.Get(ctx, types.NamespacedName{Namespace: "cozy-system", Name: "cozystack-api"}, deploy); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if deploy.Spec.Template.Annotations == nil {
deploy.Spec.Template.Annotations = map[string]string{}
}
oldHash := deploy.Spec.Template.Annotations["cozystack.io/config-hash"]
if oldHash == newHash && oldHash != "" {
r.mu.Lock()
r.lastHandled = le
r.mu.Unlock()
logger.Info("No changes in CRD config; skipping restart", "hash", newHash)
return ctrl.Result{}, nil
}
patch := client.MergeFrom(deploy.DeepCopy())
deploy.Spec.Template.Annotations["cozystack.io/config-hash"] = newHash
if err := r.Patch(ctx, deploy, patch); err != nil {
return ctrl.Result{}, err
}
r.mu.Lock()
r.lastHandled = le
r.mu.Unlock()
logger.Info("Updated cozystack-api podTemplate config-hash; rollout triggered",
"old", oldHash, "new", newHash)
return ctrl.Result{}, nil
}

View File

@@ -0,0 +1,367 @@
package lineagelabeler
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
"github.com/cozystack/cozystack/internal/shared/crdmem"
"github.com/cozystack/cozystack/pkg/lineage"
helmv2 "github.com/fluxcd/helm-controller/api/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
var ErrNoAncestors = errors.New("no ancestors")
type LineageLabelerReconciler struct {
client.Client
Scheme *runtime.Scheme
WatchResourceCSV string
dynClient dynamic.Interface
mapper meta.RESTMapper
appMap atomic.Value
once sync.Once
mem *crdmem.Memory
}
type chartRef struct{ repo, chart string }
type appRef struct{ groupVersion, kind, prefix string }
func (r *LineageLabelerReconciler) initMapping() {
r.once.Do(func() {
r.appMap.Store(make(map[chartRef]appRef))
})
}
func (r *LineageLabelerReconciler) currentMap() map[chartRef]appRef {
val := r.appMap.Load()
if val == nil {
return map[chartRef]appRef{}
}
return val.(map[chartRef]appRef)
}
func (r *LineageLabelerReconciler) Map(hr *helmv2.HelmRelease) (string, string, string, error) {
cfg := r.currentMap()
s := hr.Spec.Chart.Spec
key := chartRef{s.SourceRef.Name, s.Chart}
if v, ok := cfg[key]; ok {
return v.groupVersion, v.kind, v.prefix, nil
}
return "", "", "", fmt.Errorf("cannot map helm release %s/%s to dynamic app", hr.Namespace, hr.Name)
}
func parseGVKList(csv string) ([]schema.GroupVersionKind, error) {
csv = strings.TrimSpace(csv)
if csv == "" {
return nil, fmt.Errorf("watch resource list is empty")
}
parts := strings.Split(csv, ",")
out := make([]schema.GroupVersionKind, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
s := strings.Split(p, "/")
if len(s) == 2 {
out = append(out, schema.GroupVersionKind{Group: "", Version: s[0], Kind: s[1]})
continue
}
if len(s) == 3 {
out = append(out, schema.GroupVersionKind{Group: s[0], Version: s[1], Kind: s[2]})
continue
}
return nil, fmt.Errorf("invalid resource token %q, expected 'group/version/Kind' or 'v1/Kind'", p)
}
return out, nil
}
func (r *LineageLabelerReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.initMapping()
cfg := rest.CopyConfig(mgr.GetConfig())
dc, err := dynamic.NewForConfig(cfg)
if err != nil {
return err
}
disco, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return err
}
cached := memory.NewMemCacheClient(disco)
r.dynClient = dc
r.mapper = restmapper.NewDeferredDiscoveryRESTMapper(cached)
if r.mem == nil {
r.mem = crdmem.Global()
}
if err := r.mem.EnsurePrimingWithManager(mgr); err != nil {
return err
}
gvks, err := parseGVKList(r.WatchResourceCSV)
if err != nil {
return err
}
if len(gvks) == 0 {
return fmt.Errorf("no resources to watch")
}
b := ctrl.NewControllerManagedBy(mgr).Named("lineage-labeler")
nsPred := predicate.NewPredicateFuncs(func(obj client.Object) bool {
ns := obj.GetNamespace()
return ns != "" && strings.HasPrefix(ns, "tenant-")
})
primary := gvks[0]
primaryObj := &unstructured.Unstructured{}
primaryObj.SetGroupVersionKind(primary)
b = b.For(primaryObj,
builder.WithPredicates(
predicate.And(
nsPred,
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.ResourceVersionChangedPredicate{},
),
),
),
)
for _, gvk := range gvks[1:] {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
b = b.Watches(u,
&handler.EnqueueRequestForObject{},
builder.WithPredicates(
predicate.And(
nsPred,
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.ResourceVersionChangedPredicate{},
),
),
),
)
}
b = b.Watches(
&cozyv1alpha1.CozystackResourceDefinition{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
_ = r.refreshAppMap(ctx)
return nil
}),
)
_ = r.refreshAppMap(context.Background())
return b.Complete(r)
}
func (r *LineageLabelerReconciler) refreshAppMap(ctx context.Context) error {
var items []cozyv1alpha1.CozystackResourceDefinition
var err error
if r.mem != nil {
items, err = r.mem.ListFromCacheOrAPI(ctx, r.Client)
} else {
var list cozyv1alpha1.CozystackResourceDefinitionList
err = r.Client.List(ctx, &list)
items = list.Items
}
if err != nil {
return err
}
newMap := make(map[chartRef]appRef, len(items))
for _, crd := range items {
k := chartRef{
repo: crd.Spec.Release.Chart.SourceRef.Name,
chart: crd.Spec.Release.Chart.Name,
}
v := appRef{
groupVersion: "apps.cozystack.io/v1alpha1",
kind: crd.Spec.Application.Kind,
prefix: crd.Spec.Release.Prefix,
}
if _, exists := newMap[k]; exists {
continue
}
newMap[k] = v
}
r.appMap.Store(newMap)
return nil
}
func (r *LineageLabelerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
if req.Namespace == "" || !strings.HasPrefix(req.Namespace, "tenant-") {
return ctrl.Result{}, nil
}
if len(r.currentMap()) == 0 {
_ = r.refreshAppMap(ctx)
if len(r.currentMap()) == 0 {
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
}
}
gvks, err := parseGVKList(r.WatchResourceCSV)
if err != nil {
return ctrl.Result{}, err
}
var obj *unstructured.Unstructured
found := false
for _, gvk := range gvks {
mapping, mErr := r.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if mErr != nil {
continue
}
ns := req.Namespace
if mapping.Scope.Name() != meta.RESTScopeNameNamespace {
ns = ""
}
res, gErr := r.dynClient.Resource(mapping.Resource).Namespace(ns).Get(ctx, req.Name, metav1.GetOptions{})
if gErr != nil {
if apierrors.IsNotFound(gErr) {
continue
}
continue
}
obj = res
found = true
break
}
if !found || obj == nil {
return ctrl.Result{}, nil
}
existing := obj.GetLabels()
if existing == nil {
existing = map[string]string{}
}
keys := []string{
"apps.cozystack.io/application.group",
"apps.cozystack.io/application.kind",
"apps.cozystack.io/application.name",
}
allPresent := true
for _, k := range keys {
if _, ok := existing[k]; !ok {
allPresent = false
break
}
}
if allPresent {
return ctrl.Result{}, nil
}
labels, warn, err := r.computeLabels(ctx, obj)
if err != nil {
if errors.Is(err, ErrNoAncestors) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if warn != "" {
l.V(1).Info("lineage ambiguous; using first ancestor", "name", req.NamespacedName)
}
for k, v := range labels {
existing[k] = v
}
obj.SetLabels(existing)
// Server-Side Apply: claim ownership of our label keys
gvk := obj.GroupVersionKind()
patch := &unstructured.Unstructured{}
patch.SetGroupVersionKind(gvk)
patch.SetNamespace(obj.GetNamespace())
patch.SetName(obj.GetName())
patch.SetLabels(map[string]string{
"apps.cozystack.io/application.group": existing["apps.cozystack.io/application.group"],
"apps.cozystack.io/application.kind": existing["apps.cozystack.io/application.kind"],
"apps.cozystack.io/application.name": existing["apps.cozystack.io/application.name"],
})
// Use controller-runtime client with Apply patch type and field owner
if err := r.Patch(ctx, patch,
client.Apply,
client.FieldOwner("cozystack/lineage"),
client.ForceOwnership(false),
); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: 500 * time.Millisecond}, nil
}
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *LineageLabelerReconciler) computeLabels(ctx context.Context, o *unstructured.Unstructured) (map[string]string, string, error) {
owners := lineage.WalkOwnershipGraph(ctx, r.dynClient, r.mapper, r, o)
if len(owners) == 0 {
return nil, "", ErrNoAncestors
}
obj, err := owners[0].GetUnstructured(ctx, r.dynClient, r.mapper)
if err != nil {
return nil, "", err
}
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
if err != nil {
return nil, "", fmt.Errorf("invalid APIVersion %s: %w", obj.GetAPIVersion(), err)
}
var warn string
if len(owners) > 1 {
warn = "ambiguous"
}
group := gv.Group
if len(group) > 63 {
group = trimDNSLabel(group[:63])
}
return map[string]string{
"apps.cozystack.io/application.group": group,
"apps.cozystack.io/application.kind": obj.GetKind(),
"apps.cozystack.io/application.name": obj.GetName(),
}, warn, nil
}
func trimDNSLabel(s string) string {
for len(s) > 0 {
b := s[len(s)-1]
if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || (b >= '0' && b <= '9') {
return s
}
s = s[:len(s)-1]
}
return s
}

View File

@@ -1,40 +0,0 @@
package lineagecontrollerwebhook
import (
"fmt"
helmv2 "github.com/fluxcd/helm-controller/api/v2"
)
type chartRef struct {
repo string
chart string
}
type appRef struct {
groupVersion string
kind string
prefix string
}
type runtimeConfig struct {
chartAppMap map[chartRef]appRef
}
func (l *LineageControllerWebhook) initConfig() {
l.initOnce.Do(func() {
if l.config.Load() == nil {
l.config.Store(&runtimeConfig{chartAppMap: make(map[chartRef]appRef)})
}
})
}
func (l *LineageControllerWebhook) Map(hr *helmv2.HelmRelease) (string, string, string, error) {
cfg := l.config.Load().(*runtimeConfig).chartAppMap
s := &hr.Spec.Chart.Spec
val, ok := cfg[chartRef{s.SourceRef.Name, s.Chart}]
if !ok {
return "", "", "", fmt.Errorf("cannot map helm release %s/%s to dynamic app", hr.Namespace, hr.Name)
}
return val.groupVersion, val.kind, val.prefix, nil
}

View File

@@ -1,42 +0,0 @@
package lineagecontrollerwebhook
import (
"context"
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// +kubebuilder:rbac:groups=cozystack.io,resources=cozystackresourcedefinitions,verbs=list;watch
func (c *LineageControllerWebhook) SetupWithManagerAsController(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&cozyv1alpha1.CozystackResourceDefinition{}).
Complete(c)
}
func (c *LineageControllerWebhook) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
crds := &cozyv1alpha1.CozystackResourceDefinitionList{}
if err := c.List(ctx, crds, &client.ListOptions{Namespace: "cozy-system"}); err != nil {
l.Error(err, "failed reading CozystackResourceDefinitions")
return ctrl.Result{}, err
}
newConfig := make(map[chartRef]appRef)
for _, crd := range crds.Items {
k := chartRef{
crd.Spec.Release.Chart.SourceRef.Name,
crd.Spec.Release.Chart.Name,
}
newRef := appRef{"apps.cozystack.io/v1alpha1", crd.Spec.Application.Kind, crd.Spec.Release.Prefix}
if oldRef, exists := newConfig[k]; exists {
l.Info("duplicate chart mapping detected; ignoring subsequent entry", "key", k, "retained value", oldRef, "ignored value", newRef)
continue
}
newConfig[k] = newRef
}
c.config.Store(&runtimeConfig{newConfig})
return ctrl.Result{}, nil
}

View File

@@ -1,23 +0,0 @@
package lineagecontrollerwebhook
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
// +kubebuilder:webhook:path=/mutate-lineage,mutating=true,failurePolicy=Fail,sideEffects=None,groups="",resources=pods,secrets,services,persistentvolumeclaims,verbs=create;update,versions=v1,name=mlineage.cozystack.io,admissionReviewVersions={v1}
type LineageControllerWebhook struct {
client.Client
Scheme *runtime.Scheme
decoder admission.Decoder
dynClient dynamic.Interface
mapper meta.RESTMapper
config atomic.Value
initOnce sync.Once
}

View File

@@ -1,166 +0,0 @@
package lineagecontrollerwebhook
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/cozystack/cozystack/pkg/lineage"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
var (
NoAncestors = fmt.Errorf("no managed apps found in lineage")
AncestryAmbiguous = fmt.Errorf("object ancestry is ambiguous")
)
// SetupWithManager registers the handler with the webhook server.
func (h *LineageControllerWebhook) SetupWithManagerAsWebhook(mgr ctrl.Manager) error {
cfg := rest.CopyConfig(mgr.GetConfig())
var err error
h.dynClient, err = dynamic.NewForConfig(cfg)
if err != nil {
return err
}
discoClient, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return err
}
cachedDisco := memory.NewMemCacheClient(discoClient)
h.mapper = restmapper.NewDeferredDiscoveryRESTMapper(cachedDisco)
h.initConfig()
// Register HTTP path -> handler.
mgr.GetWebhookServer().Register("/mutate-lineage", &admission.Webhook{Handler: h})
return nil
}
// InjectDecoder lets controller-runtime give us a decoder for AdmissionReview requests.
func (h *LineageControllerWebhook) InjectDecoder(d admission.Decoder) error {
h.decoder = d
return nil
}
// Handle is called for each AdmissionReview that matches the webhook config.
func (h *LineageControllerWebhook) Handle(ctx context.Context, req admission.Request) admission.Response {
logger := log.FromContext(ctx).WithValues(
"gvk", req.Kind.String(),
"namespace", req.Namespace,
"name", req.Name,
"operation", req.Operation,
)
warn := make(admission.Warnings, 0)
obj := &unstructured.Unstructured{}
if err := h.decodeUnstructured(req, obj); err != nil {
return admission.Errored(400, fmt.Errorf("decode object: %w", err))
}
labels, err := h.computeLabels(ctx, obj)
for {
if err != nil && errors.Is(err, NoAncestors) {
return admission.Allowed("object not managed by app")
}
if err != nil && errors.Is(err, AncestryAmbiguous) {
warn = append(warn, "object ancestry ambiguous, using first ancestor found")
break
}
if err != nil {
return admission.Errored(500, fmt.Errorf("error computing lineage labels: %w", err))
}
if err == nil {
break
}
}
h.applyLabels(obj, labels)
mutated, err := json.Marshal(obj)
if err != nil {
return admission.Errored(500, fmt.Errorf("marshal mutated pod: %w", err))
}
logger.V(1).Info("mutated pod", "namespace", obj.GetNamespace(), "name", obj.GetName())
return admission.PatchResponseFromRaw(req.Object.Raw, mutated).WithWarnings(warn...)
}
func (h *LineageControllerWebhook) computeLabels(ctx context.Context, o *unstructured.Unstructured) (map[string]string, error) {
owners := lineage.WalkOwnershipGraph(ctx, h.dynClient, h.mapper, h, o)
if len(owners) == 0 {
return nil, NoAncestors
}
obj, err := owners[0].GetUnstructured(ctx, h.dynClient, h.mapper)
if err != nil {
return nil, err
}
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
if err != nil {
// should never happen, we got an APIVersion right from the API
return nil, fmt.Errorf("could not parse APIVersion %s to a group and version: %w", obj.GetAPIVersion(), err)
}
if len(owners) > 1 {
err = AncestryAmbiguous
}
return map[string]string{
// truncate apigroup to first 63 chars
"apps.cozystack.io/application.group": func(s string) string {
if len(s) < 63 {
return s
}
s = s[:63]
for b := s[62]; !((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || (b >= '0' && b <= '9')); s = s[:len(s)-1] {
b = s[len(s)-1]
}
return s
}(gv.Group),
"apps.cozystack.io/application.kind": obj.GetKind(),
"apps.cozystack.io/application.name": obj.GetName(),
}, err
}
func (h *LineageControllerWebhook) applyLabels(o client.Object, labels map[string]string) {
existing := o.GetLabels()
if existing == nil {
existing = make(map[string]string)
}
for k, v := range labels {
existing[k] = v
}
o.SetLabels(existing)
}
func (h *LineageControllerWebhook) decodeUnstructured(req admission.Request, out *unstructured.Unstructured) error {
if h.decoder != nil {
if err := h.decoder.Decode(req, out); err == nil {
return nil
}
if req.Kind.Group != "" || req.Kind.Kind != "" || req.Kind.Version != "" {
out.SetGroupVersionKind(schema.GroupVersionKind{
Group: req.Kind.Group,
Version: req.Kind.Version,
Kind: req.Kind.Kind,
})
if err := h.decoder.Decode(req, out); err == nil {
return nil
}
}
}
if len(req.Object.Raw) == 0 {
return errors.New("empty admission object")
}
return json.Unmarshal(req.Object.Raw, &out.Object)
}

View File

@@ -0,0 +1,99 @@
package crdmem
import (
"context"
"sync"
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type Memory struct {
mu sync.RWMutex
data map[string]cozyv1alpha1.CozystackResourceDefinition
primed bool
primeOnce sync.Once
}
func New() *Memory {
return &Memory{data: make(map[string]cozyv1alpha1.CozystackResourceDefinition)}
}
var (
global *Memory
globalOnce sync.Once
)
func Global() *Memory {
globalOnce.Do(func() { global = New() })
return global
}
func (m *Memory) Upsert(obj *cozyv1alpha1.CozystackResourceDefinition) {
if obj == nil {
return
}
m.mu.Lock()
m.data[obj.Name] = *obj.DeepCopy()
m.mu.Unlock()
}
func (m *Memory) Delete(name string) {
m.mu.Lock()
delete(m.data, name)
m.mu.Unlock()
}
func (m *Memory) Snapshot() []cozyv1alpha1.CozystackResourceDefinition {
m.mu.RLock()
defer m.mu.RUnlock()
out := make([]cozyv1alpha1.CozystackResourceDefinition, 0, len(m.data))
for _, v := range m.data {
out = append(out, v)
}
return out
}
func (m *Memory) IsPrimed() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.primed
}
type runnable func(context.Context) error
func (r runnable) Start(ctx context.Context) error { return r(ctx) }
func (m *Memory) EnsurePrimingWithManager(mgr ctrl.Manager) error {
var errOut error
m.primeOnce.Do(func() {
errOut = mgr.Add(runnable(func(ctx context.Context) error {
if ok := mgr.GetCache().WaitForCacheSync(ctx); !ok {
return nil
}
var list cozyv1alpha1.CozystackResourceDefinitionList
if err := mgr.GetClient().List(ctx, &list); err == nil {
for i := range list.Items {
m.Upsert(&list.Items[i])
}
m.mu.Lock()
m.primed = true
m.mu.Unlock()
}
return nil
}))
})
return errOut
}
func (m *Memory) ListFromCacheOrAPI(ctx context.Context, c client.Client) ([]cozyv1alpha1.CozystackResourceDefinition, error) {
if m.IsPrimed() {
return m.Snapshot(), nil
}
var list cozyv1alpha1.CozystackResourceDefinitionList
if err := c.List(ctx, &list); err != nil {
return nil, err
}
return list.Items, nil
}