mirror of
https://github.com/cozystack/cozystack.git
synced 2026-03-03 13:38:56 +00:00
Compare commits
1 Commits
v0.37.3
...
lineage-co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ca61c71084 |
@@ -38,7 +38,7 @@ import (
|
||||
|
||||
cozystackiov1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
|
||||
"github.com/cozystack/cozystack/internal/controller"
|
||||
lcw "github.com/cozystack/cozystack/internal/lineagecontrollerwebhook"
|
||||
"github.com/cozystack/cozystack/internal/controller/lineagelabeler"
|
||||
"github.com/cozystack/cozystack/internal/telemetry"
|
||||
|
||||
helmv2 "github.com/fluxcd/helm-controller/api/v2"
|
||||
@@ -68,6 +68,7 @@ func main() {
|
||||
var telemetryEndpoint string
|
||||
var telemetryInterval string
|
||||
var cozystackVersion string
|
||||
var watchResources string
|
||||
var tlsOpts []func(*tls.Config)
|
||||
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
|
||||
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
|
||||
@@ -87,6 +88,9 @@ func main() {
|
||||
"Interval between telemetry data collection (e.g. 15m, 1h)")
|
||||
flag.StringVar(&cozystackVersion, "cozystack-version", "unknown",
|
||||
"Version of Cozystack")
|
||||
flag.StringVar(&watchResources, "watch-resources",
|
||||
"v1/Pod,v1/Service,v1/Secret,v1/PersistentVolumeClaim",
|
||||
"Comma-separated list of resources to watch in the form 'group/version/Kind'.")
|
||||
opts := zap.Options{
|
||||
Development: false,
|
||||
}
|
||||
@@ -215,17 +219,12 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// special one that's both a webhook and a reconciler
|
||||
lineageControllerWebhook := &lcw.LineageControllerWebhook{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
}
|
||||
if err := lineageControllerWebhook.SetupWithManagerAsController(mgr); err != nil {
|
||||
setupLog.Error(err, "unable to setup controller", "controller", "LineageController")
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := lineageControllerWebhook.SetupWithManagerAsWebhook(mgr); err != nil {
|
||||
setupLog.Error(err, "unable to setup webhook", "webhook", "LineageWebhook")
|
||||
if err := (&lineagelabeler.LineageLabelerReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
WatchResourceCSV: watchResources,
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
setupLog.Error(err, "unable to create controller", "controller", "LineageLabeler")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
||||
@@ -2,14 +2,25 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cozystack/cozystack/internal/shared/crdmem"
|
||||
|
||||
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
@@ -20,85 +31,55 @@ type CozystackResourceDefinitionReconciler struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
|
||||
// Configurable debounce duration
|
||||
Debounce time.Duration
|
||||
|
||||
// Internal state for debouncing
|
||||
mu sync.Mutex
|
||||
lastEvent time.Time // Time of last CRUD event on CozystackResourceDefinition
|
||||
lastHandled time.Time // Last time the Deployment was actually restarted
|
||||
lastEvent time.Time
|
||||
lastHandled time.Time
|
||||
|
||||
mem *crdmem.Memory
|
||||
}
|
||||
|
||||
// Reconcile handles the logic to restart the target Deployment only once,
|
||||
// even if multiple events occur close together
|
||||
func (r *CozystackResourceDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
log := log.FromContext(ctx)
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
// Only respond to our target deployment
|
||||
if req.Namespace != "cozy-system" || req.Name != "cozystack-api" {
|
||||
crd := &cozyv1alpha1.CozystackResourceDefinition{}
|
||||
err := r.Get(ctx, types.NamespacedName{Name: req.Name}, crd)
|
||||
if err == nil {
|
||||
if r.mem != nil {
|
||||
r.mem.Upsert(crd)
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
r.lastEvent = time.Now()
|
||||
r.mu.Unlock()
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
le := r.lastEvent
|
||||
lh := r.lastHandled
|
||||
debounce := r.Debounce
|
||||
r.mu.Unlock()
|
||||
|
||||
if debounce <= 0 {
|
||||
debounce = 5 * time.Second
|
||||
}
|
||||
|
||||
// No events received yet — nothing to do
|
||||
if le.IsZero() {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// Wait until the debounce duration has passed since the last event
|
||||
if d := time.Since(le); d < debounce {
|
||||
return ctrl.Result{RequeueAfter: debounce - d}, nil
|
||||
}
|
||||
|
||||
// Already handled this event — skip restart
|
||||
if !lh.Before(le) {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// Perform the restart by patching the deployment annotation
|
||||
deploy := &appsv1.Deployment{}
|
||||
if err := r.Get(ctx, types.NamespacedName{Namespace: "cozy-system", Name: "cozystack-api"}, deploy); err != nil {
|
||||
log.Error(err, "Failed to get Deployment cozy-system/cozystack-api")
|
||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||
}
|
||||
|
||||
patch := client.MergeFrom(deploy.DeepCopy())
|
||||
if deploy.Spec.Template.Annotations == nil {
|
||||
deploy.Spec.Template.Annotations = make(map[string]string)
|
||||
}
|
||||
deploy.Spec.Template.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
|
||||
|
||||
if err := r.Patch(ctx, deploy, patch); err != nil {
|
||||
log.Error(err, "Failed to patch Deployment annotation")
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Mark this event as handled
|
||||
r.mu.Lock()
|
||||
r.lastHandled = le
|
||||
r.mu.Unlock()
|
||||
|
||||
log.Info("Deployment cozy-system/cozystack-api successfully restarted")
|
||||
if apierrors.IsNotFound(err) && r.mem != nil {
|
||||
r.mem.Delete(req.Name)
|
||||
}
|
||||
if req.Namespace == "cozy-system" && req.Name == "cozystack-api" {
|
||||
return r.debouncedRestart(ctx, logger)
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// SetupWithManager configures how the controller listens to events
|
||||
func (r *CozystackResourceDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
if r.Debounce == 0 {
|
||||
r.Debounce = 5 * time.Second
|
||||
}
|
||||
|
||||
if r.mem == nil {
|
||||
r.mem = crdmem.Global()
|
||||
}
|
||||
if err := r.mem.EnsurePrimingWithManager(mgr); err != nil {
|
||||
return err
|
||||
}
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
Named("cozystack-restart-controller").
|
||||
Named("cozystackresource-controller").
|
||||
For(&cozyv1alpha1.CozystackResourceDefinition{}, builder.WithPredicates()).
|
||||
Watches(
|
||||
&cozyv1alpha1.CozystackResourceDefinition{},
|
||||
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
|
||||
@@ -115,3 +96,88 @@ func (r *CozystackResourceDefinitionReconciler) SetupWithManager(mgr ctrl.Manage
|
||||
).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
type crdHashView struct {
|
||||
Name string `json:"name"`
|
||||
Spec cozyv1alpha1.CozystackResourceDefinitionSpec `json:"spec"`
|
||||
}
|
||||
|
||||
func (r *CozystackResourceDefinitionReconciler) computeConfigHash() (string, error) {
|
||||
if r.mem == nil {
|
||||
return "", nil
|
||||
}
|
||||
snapshot := r.mem.Snapshot()
|
||||
sort.Slice(snapshot, func(i, j int) bool { return snapshot[i].Name < snapshot[j].Name })
|
||||
views := make([]crdHashView, 0, len(snapshot))
|
||||
for i := range snapshot {
|
||||
views = append(views, crdHashView{
|
||||
Name: snapshot[i].Name,
|
||||
Spec: snapshot[i].Spec,
|
||||
})
|
||||
}
|
||||
b, err := json.Marshal(views)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
sum := sha256.Sum256(b)
|
||||
return hex.EncodeToString(sum[:]), nil
|
||||
}
|
||||
|
||||
func (r *CozystackResourceDefinitionReconciler) debouncedRestart(ctx context.Context, logger logr.Logger) (ctrl.Result, error) {
|
||||
r.mu.Lock()
|
||||
le := r.lastEvent
|
||||
lh := r.lastHandled
|
||||
debounce := r.Debounce
|
||||
r.mu.Unlock()
|
||||
|
||||
if debounce <= 0 {
|
||||
debounce = 5 * time.Second
|
||||
}
|
||||
if le.IsZero() {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
if d := time.Since(le); d < debounce {
|
||||
return ctrl.Result{RequeueAfter: debounce - d}, nil
|
||||
}
|
||||
if !lh.Before(le) {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
newHash, err := r.computeConfigHash()
|
||||
if err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
deploy := &appsv1.Deployment{}
|
||||
if err := r.Get(ctx, types.NamespacedName{Namespace: "cozy-system", Name: "cozystack-api"}, deploy); err != nil {
|
||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||
}
|
||||
|
||||
if deploy.Spec.Template.Annotations == nil {
|
||||
deploy.Spec.Template.Annotations = map[string]string{}
|
||||
}
|
||||
oldHash := deploy.Spec.Template.Annotations["cozystack.io/config-hash"]
|
||||
|
||||
if oldHash == newHash && oldHash != "" {
|
||||
r.mu.Lock()
|
||||
r.lastHandled = le
|
||||
r.mu.Unlock()
|
||||
logger.Info("No changes in CRD config; skipping restart", "hash", newHash)
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
patch := client.MergeFrom(deploy.DeepCopy())
|
||||
deploy.Spec.Template.Annotations["cozystack.io/config-hash"] = newHash
|
||||
|
||||
if err := r.Patch(ctx, deploy, patch); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
r.lastHandled = le
|
||||
r.mu.Unlock()
|
||||
|
||||
logger.Info("Updated cozystack-api podTemplate config-hash; rollout triggered",
|
||||
"old", oldHash, "new", newHash)
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
367
internal/controller/lineagelabeler/lineage_labeler_controller.go
Normal file
367
internal/controller/lineagelabeler/lineage_labeler_controller.go
Normal file
@@ -0,0 +1,367 @@
|
||||
package lineagelabeler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
|
||||
"github.com/cozystack/cozystack/internal/shared/crdmem"
|
||||
"github.com/cozystack/cozystack/pkg/lineage"
|
||||
|
||||
helmv2 "github.com/fluxcd/helm-controller/api/v2"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/discovery/cached/memory"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/restmapper"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
)
|
||||
|
||||
var ErrNoAncestors = errors.New("no ancestors")
|
||||
|
||||
type LineageLabelerReconciler struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
|
||||
WatchResourceCSV string
|
||||
|
||||
dynClient dynamic.Interface
|
||||
mapper meta.RESTMapper
|
||||
|
||||
appMap atomic.Value
|
||||
once sync.Once
|
||||
|
||||
mem *crdmem.Memory
|
||||
}
|
||||
|
||||
type chartRef struct{ repo, chart string }
|
||||
type appRef struct{ groupVersion, kind, prefix string }
|
||||
|
||||
func (r *LineageLabelerReconciler) initMapping() {
|
||||
r.once.Do(func() {
|
||||
r.appMap.Store(make(map[chartRef]appRef))
|
||||
})
|
||||
}
|
||||
|
||||
func (r *LineageLabelerReconciler) currentMap() map[chartRef]appRef {
|
||||
val := r.appMap.Load()
|
||||
if val == nil {
|
||||
return map[chartRef]appRef{}
|
||||
}
|
||||
return val.(map[chartRef]appRef)
|
||||
}
|
||||
|
||||
func (r *LineageLabelerReconciler) Map(hr *helmv2.HelmRelease) (string, string, string, error) {
|
||||
cfg := r.currentMap()
|
||||
s := hr.Spec.Chart.Spec
|
||||
key := chartRef{s.SourceRef.Name, s.Chart}
|
||||
if v, ok := cfg[key]; ok {
|
||||
return v.groupVersion, v.kind, v.prefix, nil
|
||||
}
|
||||
return "", "", "", fmt.Errorf("cannot map helm release %s/%s to dynamic app", hr.Namespace, hr.Name)
|
||||
}
|
||||
|
||||
func parseGVKList(csv string) ([]schema.GroupVersionKind, error) {
|
||||
csv = strings.TrimSpace(csv)
|
||||
if csv == "" {
|
||||
return nil, fmt.Errorf("watch resource list is empty")
|
||||
}
|
||||
parts := strings.Split(csv, ",")
|
||||
out := make([]schema.GroupVersionKind, 0, len(parts))
|
||||
for _, p := range parts {
|
||||
p = strings.TrimSpace(p)
|
||||
s := strings.Split(p, "/")
|
||||
if len(s) == 2 {
|
||||
out = append(out, schema.GroupVersionKind{Group: "", Version: s[0], Kind: s[1]})
|
||||
continue
|
||||
}
|
||||
if len(s) == 3 {
|
||||
out = append(out, schema.GroupVersionKind{Group: s[0], Version: s[1], Kind: s[2]})
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("invalid resource token %q, expected 'group/version/Kind' or 'v1/Kind'", p)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (r *LineageLabelerReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
r.initMapping()
|
||||
|
||||
cfg := rest.CopyConfig(mgr.GetConfig())
|
||||
dc, err := dynamic.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
disco, err := discovery.NewDiscoveryClientForConfig(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cached := memory.NewMemCacheClient(disco)
|
||||
r.dynClient = dc
|
||||
r.mapper = restmapper.NewDeferredDiscoveryRESTMapper(cached)
|
||||
|
||||
if r.mem == nil {
|
||||
r.mem = crdmem.Global()
|
||||
}
|
||||
if err := r.mem.EnsurePrimingWithManager(mgr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gvks, err := parseGVKList(r.WatchResourceCSV)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(gvks) == 0 {
|
||||
return fmt.Errorf("no resources to watch")
|
||||
}
|
||||
|
||||
b := ctrl.NewControllerManagedBy(mgr).Named("lineage-labeler")
|
||||
|
||||
nsPred := predicate.NewPredicateFuncs(func(obj client.Object) bool {
|
||||
ns := obj.GetNamespace()
|
||||
return ns != "" && strings.HasPrefix(ns, "tenant-")
|
||||
})
|
||||
|
||||
primary := gvks[0]
|
||||
primaryObj := &unstructured.Unstructured{}
|
||||
primaryObj.SetGroupVersionKind(primary)
|
||||
b = b.For(primaryObj,
|
||||
builder.WithPredicates(
|
||||
predicate.And(
|
||||
nsPred,
|
||||
predicate.Or(
|
||||
predicate.GenerationChangedPredicate{},
|
||||
predicate.ResourceVersionChangedPredicate{},
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
for _, gvk := range gvks[1:] {
|
||||
u := &unstructured.Unstructured{}
|
||||
u.SetGroupVersionKind(gvk)
|
||||
b = b.Watches(u,
|
||||
&handler.EnqueueRequestForObject{},
|
||||
builder.WithPredicates(
|
||||
predicate.And(
|
||||
nsPred,
|
||||
predicate.Or(
|
||||
predicate.GenerationChangedPredicate{},
|
||||
predicate.ResourceVersionChangedPredicate{},
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
b = b.Watches(
|
||||
&cozyv1alpha1.CozystackResourceDefinition{},
|
||||
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
|
||||
_ = r.refreshAppMap(ctx)
|
||||
return nil
|
||||
}),
|
||||
)
|
||||
|
||||
_ = r.refreshAppMap(context.Background())
|
||||
|
||||
return b.Complete(r)
|
||||
}
|
||||
|
||||
func (r *LineageLabelerReconciler) refreshAppMap(ctx context.Context) error {
|
||||
var items []cozyv1alpha1.CozystackResourceDefinition
|
||||
var err error
|
||||
if r.mem != nil {
|
||||
items, err = r.mem.ListFromCacheOrAPI(ctx, r.Client)
|
||||
} else {
|
||||
var list cozyv1alpha1.CozystackResourceDefinitionList
|
||||
err = r.Client.List(ctx, &list)
|
||||
items = list.Items
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newMap := make(map[chartRef]appRef, len(items))
|
||||
for _, crd := range items {
|
||||
k := chartRef{
|
||||
repo: crd.Spec.Release.Chart.SourceRef.Name,
|
||||
chart: crd.Spec.Release.Chart.Name,
|
||||
}
|
||||
v := appRef{
|
||||
groupVersion: "apps.cozystack.io/v1alpha1",
|
||||
kind: crd.Spec.Application.Kind,
|
||||
prefix: crd.Spec.Release.Prefix,
|
||||
}
|
||||
if _, exists := newMap[k]; exists {
|
||||
continue
|
||||
}
|
||||
newMap[k] = v
|
||||
}
|
||||
r.appMap.Store(newMap)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *LineageLabelerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
l := log.FromContext(ctx)
|
||||
|
||||
if req.Namespace == "" || !strings.HasPrefix(req.Namespace, "tenant-") {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
if len(r.currentMap()) == 0 {
|
||||
_ = r.refreshAppMap(ctx)
|
||||
if len(r.currentMap()) == 0 {
|
||||
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
|
||||
}
|
||||
}
|
||||
|
||||
gvks, err := parseGVKList(r.WatchResourceCSV)
|
||||
if err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
var obj *unstructured.Unstructured
|
||||
found := false
|
||||
|
||||
for _, gvk := range gvks {
|
||||
mapping, mErr := r.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
|
||||
if mErr != nil {
|
||||
continue
|
||||
}
|
||||
ns := req.Namespace
|
||||
if mapping.Scope.Name() != meta.RESTScopeNameNamespace {
|
||||
ns = ""
|
||||
}
|
||||
res, gErr := r.dynClient.Resource(mapping.Resource).Namespace(ns).Get(ctx, req.Name, metav1.GetOptions{})
|
||||
if gErr != nil {
|
||||
if apierrors.IsNotFound(gErr) {
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
obj = res
|
||||
found = true
|
||||
break
|
||||
}
|
||||
|
||||
if !found || obj == nil {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
existing := obj.GetLabels()
|
||||
if existing == nil {
|
||||
existing = map[string]string{}
|
||||
}
|
||||
|
||||
keys := []string{
|
||||
"apps.cozystack.io/application.group",
|
||||
"apps.cozystack.io/application.kind",
|
||||
"apps.cozystack.io/application.name",
|
||||
}
|
||||
allPresent := true
|
||||
for _, k := range keys {
|
||||
if _, ok := existing[k]; !ok {
|
||||
allPresent = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allPresent {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
labels, warn, err := r.computeLabels(ctx, obj)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNoAncestors) {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||
}
|
||||
if warn != "" {
|
||||
l.V(1).Info("lineage ambiguous; using first ancestor", "name", req.NamespacedName)
|
||||
}
|
||||
|
||||
for k, v := range labels {
|
||||
existing[k] = v
|
||||
}
|
||||
obj.SetLabels(existing)
|
||||
|
||||
// Server-Side Apply: claim ownership of our label keys
|
||||
gvk := obj.GroupVersionKind()
|
||||
patch := &unstructured.Unstructured{}
|
||||
patch.SetGroupVersionKind(gvk)
|
||||
patch.SetNamespace(obj.GetNamespace())
|
||||
patch.SetName(obj.GetName())
|
||||
patch.SetLabels(map[string]string{
|
||||
"apps.cozystack.io/application.group": existing["apps.cozystack.io/application.group"],
|
||||
"apps.cozystack.io/application.kind": existing["apps.cozystack.io/application.kind"],
|
||||
"apps.cozystack.io/application.name": existing["apps.cozystack.io/application.name"],
|
||||
})
|
||||
|
||||
// Use controller-runtime client with Apply patch type and field owner
|
||||
if err := r.Patch(ctx, patch,
|
||||
client.Apply,
|
||||
client.FieldOwner("cozystack/lineage"),
|
||||
client.ForceOwnership(false),
|
||||
); err != nil {
|
||||
if apierrors.IsConflict(err) {
|
||||
return ctrl.Result{RequeueAfter: 500 * time.Millisecond}, nil
|
||||
}
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *LineageLabelerReconciler) computeLabels(ctx context.Context, o *unstructured.Unstructured) (map[string]string, string, error) {
|
||||
owners := lineage.WalkOwnershipGraph(ctx, r.dynClient, r.mapper, r, o)
|
||||
if len(owners) == 0 {
|
||||
return nil, "", ErrNoAncestors
|
||||
}
|
||||
obj, err := owners[0].GetUnstructured(ctx, r.dynClient, r.mapper)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("invalid APIVersion %s: %w", obj.GetAPIVersion(), err)
|
||||
}
|
||||
var warn string
|
||||
if len(owners) > 1 {
|
||||
warn = "ambiguous"
|
||||
}
|
||||
group := gv.Group
|
||||
if len(group) > 63 {
|
||||
group = trimDNSLabel(group[:63])
|
||||
}
|
||||
return map[string]string{
|
||||
"apps.cozystack.io/application.group": group,
|
||||
"apps.cozystack.io/application.kind": obj.GetKind(),
|
||||
"apps.cozystack.io/application.name": obj.GetName(),
|
||||
}, warn, nil
|
||||
}
|
||||
|
||||
func trimDNSLabel(s string) string {
|
||||
for len(s) > 0 {
|
||||
b := s[len(s)-1]
|
||||
if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || (b >= '0' && b <= '9') {
|
||||
return s
|
||||
}
|
||||
s = s[:len(s)-1]
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
package lineagecontrollerwebhook
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
helmv2 "github.com/fluxcd/helm-controller/api/v2"
|
||||
)
|
||||
|
||||
type chartRef struct {
|
||||
repo string
|
||||
chart string
|
||||
}
|
||||
|
||||
type appRef struct {
|
||||
groupVersion string
|
||||
kind string
|
||||
prefix string
|
||||
}
|
||||
|
||||
type runtimeConfig struct {
|
||||
chartAppMap map[chartRef]appRef
|
||||
}
|
||||
|
||||
func (l *LineageControllerWebhook) initConfig() {
|
||||
l.initOnce.Do(func() {
|
||||
if l.config.Load() == nil {
|
||||
l.config.Store(&runtimeConfig{chartAppMap: make(map[chartRef]appRef)})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LineageControllerWebhook) Map(hr *helmv2.HelmRelease) (string, string, string, error) {
|
||||
cfg := l.config.Load().(*runtimeConfig).chartAppMap
|
||||
s := &hr.Spec.Chart.Spec
|
||||
val, ok := cfg[chartRef{s.SourceRef.Name, s.Chart}]
|
||||
if !ok {
|
||||
return "", "", "", fmt.Errorf("cannot map helm release %s/%s to dynamic app", hr.Namespace, hr.Name)
|
||||
}
|
||||
return val.groupVersion, val.kind, val.prefix, nil
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
package lineagecontrollerwebhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
)
|
||||
|
||||
// +kubebuilder:rbac:groups=cozystack.io,resources=cozystackresourcedefinitions,verbs=list;watch
|
||||
|
||||
func (c *LineageControllerWebhook) SetupWithManagerAsController(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&cozyv1alpha1.CozystackResourceDefinition{}).
|
||||
Complete(c)
|
||||
}
|
||||
|
||||
func (c *LineageControllerWebhook) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
l := log.FromContext(ctx)
|
||||
crds := &cozyv1alpha1.CozystackResourceDefinitionList{}
|
||||
if err := c.List(ctx, crds, &client.ListOptions{Namespace: "cozy-system"}); err != nil {
|
||||
l.Error(err, "failed reading CozystackResourceDefinitions")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
newConfig := make(map[chartRef]appRef)
|
||||
for _, crd := range crds.Items {
|
||||
k := chartRef{
|
||||
crd.Spec.Release.Chart.SourceRef.Name,
|
||||
crd.Spec.Release.Chart.Name,
|
||||
}
|
||||
newRef := appRef{"apps.cozystack.io/v1alpha1", crd.Spec.Application.Kind, crd.Spec.Release.Prefix}
|
||||
if oldRef, exists := newConfig[k]; exists {
|
||||
l.Info("duplicate chart mapping detected; ignoring subsequent entry", "key", k, "retained value", oldRef, "ignored value", newRef)
|
||||
continue
|
||||
}
|
||||
newConfig[k] = newRef
|
||||
}
|
||||
c.config.Store(&runtimeConfig{newConfig})
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
package lineagecontrollerwebhook
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
|
||||
)
|
||||
|
||||
// +kubebuilder:webhook:path=/mutate-lineage,mutating=true,failurePolicy=Fail,sideEffects=None,groups="",resources=pods,secrets,services,persistentvolumeclaims,verbs=create;update,versions=v1,name=mlineage.cozystack.io,admissionReviewVersions={v1}
|
||||
type LineageControllerWebhook struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
decoder admission.Decoder
|
||||
dynClient dynamic.Interface
|
||||
mapper meta.RESTMapper
|
||||
config atomic.Value
|
||||
initOnce sync.Once
|
||||
}
|
||||
@@ -1,166 +0,0 @@
|
||||
package lineagecontrollerwebhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/cozystack/cozystack/pkg/lineage"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/discovery/cached/memory"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/restmapper"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
|
||||
)
|
||||
|
||||
var (
|
||||
NoAncestors = fmt.Errorf("no managed apps found in lineage")
|
||||
AncestryAmbiguous = fmt.Errorf("object ancestry is ambiguous")
|
||||
)
|
||||
|
||||
// SetupWithManager registers the handler with the webhook server.
|
||||
func (h *LineageControllerWebhook) SetupWithManagerAsWebhook(mgr ctrl.Manager) error {
|
||||
cfg := rest.CopyConfig(mgr.GetConfig())
|
||||
|
||||
var err error
|
||||
h.dynClient, err = dynamic.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
discoClient, err := discovery.NewDiscoveryClientForConfig(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cachedDisco := memory.NewMemCacheClient(discoClient)
|
||||
h.mapper = restmapper.NewDeferredDiscoveryRESTMapper(cachedDisco)
|
||||
|
||||
h.initConfig()
|
||||
// Register HTTP path -> handler.
|
||||
mgr.GetWebhookServer().Register("/mutate-lineage", &admission.Webhook{Handler: h})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InjectDecoder lets controller-runtime give us a decoder for AdmissionReview requests.
|
||||
func (h *LineageControllerWebhook) InjectDecoder(d admission.Decoder) error {
|
||||
h.decoder = d
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handle is called for each AdmissionReview that matches the webhook config.
|
||||
func (h *LineageControllerWebhook) Handle(ctx context.Context, req admission.Request) admission.Response {
|
||||
logger := log.FromContext(ctx).WithValues(
|
||||
"gvk", req.Kind.String(),
|
||||
"namespace", req.Namespace,
|
||||
"name", req.Name,
|
||||
"operation", req.Operation,
|
||||
)
|
||||
warn := make(admission.Warnings, 0)
|
||||
|
||||
obj := &unstructured.Unstructured{}
|
||||
if err := h.decodeUnstructured(req, obj); err != nil {
|
||||
return admission.Errored(400, fmt.Errorf("decode object: %w", err))
|
||||
}
|
||||
|
||||
labels, err := h.computeLabels(ctx, obj)
|
||||
for {
|
||||
if err != nil && errors.Is(err, NoAncestors) {
|
||||
return admission.Allowed("object not managed by app")
|
||||
}
|
||||
if err != nil && errors.Is(err, AncestryAmbiguous) {
|
||||
warn = append(warn, "object ancestry ambiguous, using first ancestor found")
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return admission.Errored(500, fmt.Errorf("error computing lineage labels: %w", err))
|
||||
}
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
h.applyLabels(obj, labels)
|
||||
|
||||
mutated, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return admission.Errored(500, fmt.Errorf("marshal mutated pod: %w", err))
|
||||
}
|
||||
logger.V(1).Info("mutated pod", "namespace", obj.GetNamespace(), "name", obj.GetName())
|
||||
return admission.PatchResponseFromRaw(req.Object.Raw, mutated).WithWarnings(warn...)
|
||||
}
|
||||
|
||||
func (h *LineageControllerWebhook) computeLabels(ctx context.Context, o *unstructured.Unstructured) (map[string]string, error) {
|
||||
owners := lineage.WalkOwnershipGraph(ctx, h.dynClient, h.mapper, h, o)
|
||||
if len(owners) == 0 {
|
||||
return nil, NoAncestors
|
||||
}
|
||||
obj, err := owners[0].GetUnstructured(ctx, h.dynClient, h.mapper)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
|
||||
if err != nil {
|
||||
// should never happen, we got an APIVersion right from the API
|
||||
return nil, fmt.Errorf("could not parse APIVersion %s to a group and version: %w", obj.GetAPIVersion(), err)
|
||||
}
|
||||
if len(owners) > 1 {
|
||||
err = AncestryAmbiguous
|
||||
}
|
||||
return map[string]string{
|
||||
// truncate apigroup to first 63 chars
|
||||
"apps.cozystack.io/application.group": func(s string) string {
|
||||
if len(s) < 63 {
|
||||
return s
|
||||
}
|
||||
s = s[:63]
|
||||
for b := s[62]; !((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || (b >= '0' && b <= '9')); s = s[:len(s)-1] {
|
||||
b = s[len(s)-1]
|
||||
}
|
||||
return s
|
||||
}(gv.Group),
|
||||
"apps.cozystack.io/application.kind": obj.GetKind(),
|
||||
"apps.cozystack.io/application.name": obj.GetName(),
|
||||
}, err
|
||||
}
|
||||
|
||||
func (h *LineageControllerWebhook) applyLabels(o client.Object, labels map[string]string) {
|
||||
existing := o.GetLabels()
|
||||
if existing == nil {
|
||||
existing = make(map[string]string)
|
||||
}
|
||||
for k, v := range labels {
|
||||
existing[k] = v
|
||||
}
|
||||
o.SetLabels(existing)
|
||||
}
|
||||
|
||||
func (h *LineageControllerWebhook) decodeUnstructured(req admission.Request, out *unstructured.Unstructured) error {
|
||||
if h.decoder != nil {
|
||||
if err := h.decoder.Decode(req, out); err == nil {
|
||||
return nil
|
||||
}
|
||||
if req.Kind.Group != "" || req.Kind.Kind != "" || req.Kind.Version != "" {
|
||||
out.SetGroupVersionKind(schema.GroupVersionKind{
|
||||
Group: req.Kind.Group,
|
||||
Version: req.Kind.Version,
|
||||
Kind: req.Kind.Kind,
|
||||
})
|
||||
if err := h.decoder.Decode(req, out); err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(req.Object.Raw) == 0 {
|
||||
return errors.New("empty admission object")
|
||||
}
|
||||
return json.Unmarshal(req.Object.Raw, &out.Object)
|
||||
}
|
||||
99
internal/shared/crdmem/memory.go
Normal file
99
internal/shared/crdmem/memory.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package crdmem
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
cozyv1alpha1 "github.com/cozystack/cozystack/api/v1alpha1"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
type Memory struct {
|
||||
mu sync.RWMutex
|
||||
data map[string]cozyv1alpha1.CozystackResourceDefinition
|
||||
primed bool
|
||||
primeOnce sync.Once
|
||||
}
|
||||
|
||||
func New() *Memory {
|
||||
return &Memory{data: make(map[string]cozyv1alpha1.CozystackResourceDefinition)}
|
||||
}
|
||||
|
||||
var (
|
||||
global *Memory
|
||||
globalOnce sync.Once
|
||||
)
|
||||
|
||||
func Global() *Memory {
|
||||
globalOnce.Do(func() { global = New() })
|
||||
return global
|
||||
}
|
||||
|
||||
func (m *Memory) Upsert(obj *cozyv1alpha1.CozystackResourceDefinition) {
|
||||
if obj == nil {
|
||||
return
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.data[obj.Name] = *obj.DeepCopy()
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *Memory) Delete(name string) {
|
||||
m.mu.Lock()
|
||||
delete(m.data, name)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *Memory) Snapshot() []cozyv1alpha1.CozystackResourceDefinition {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
out := make([]cozyv1alpha1.CozystackResourceDefinition, 0, len(m.data))
|
||||
for _, v := range m.data {
|
||||
out = append(out, v)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (m *Memory) IsPrimed() bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.primed
|
||||
}
|
||||
|
||||
type runnable func(context.Context) error
|
||||
|
||||
func (r runnable) Start(ctx context.Context) error { return r(ctx) }
|
||||
|
||||
func (m *Memory) EnsurePrimingWithManager(mgr ctrl.Manager) error {
|
||||
var errOut error
|
||||
m.primeOnce.Do(func() {
|
||||
errOut = mgr.Add(runnable(func(ctx context.Context) error {
|
||||
if ok := mgr.GetCache().WaitForCacheSync(ctx); !ok {
|
||||
return nil
|
||||
}
|
||||
var list cozyv1alpha1.CozystackResourceDefinitionList
|
||||
if err := mgr.GetClient().List(ctx, &list); err == nil {
|
||||
for i := range list.Items {
|
||||
m.Upsert(&list.Items[i])
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.primed = true
|
||||
m.mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
})
|
||||
return errOut
|
||||
}
|
||||
|
||||
func (m *Memory) ListFromCacheOrAPI(ctx context.Context, c client.Client) ([]cozyv1alpha1.CozystackResourceDefinition, error) {
|
||||
if m.IsPrimed() {
|
||||
return m.Snapshot(), nil
|
||||
}
|
||||
var list cozyv1alpha1.CozystackResourceDefinitionList
|
||||
if err := c.List(ctx, &list); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return list.Items, nil
|
||||
}
|
||||
Reference in New Issue
Block a user