From f4e0145c1c79939312cebd664440ac2e09b38607 Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Mon, 20 Oct 2025 17:23:37 +0300 Subject: [PATCH] [api] Use shared informer cache This patch changes all clients in the Cozystack API server to typed ones from the controller runtime. This should improve the performance of the API server and simplifies the code by removing work with unstructured objects and dynamic clients. ```release-note [api] Use typed and cache-backed k8s clients in the Cozystack API to improve performance. Get rid of operations on unstructured objects and use of dynamic clients. ``` Signed-off-by: Timofei Larkin --- .../system/cozystack-api/templates/rbac.yaml | 2 +- pkg/apiserver/apiserver.go | 86 +++++--- pkg/registry/apps/application/rest.go | 187 ++++++++---------- pkg/registry/core/tenantmodule/rest.go | 98 ++++----- pkg/registry/core/tenantnamespace/rest.go | 44 ++--- pkg/registry/core/tenantsecret/rest.go | 70 ++++--- pkg/registry/core/tenantsecretstable/rest.go | 36 +++- 7 files changed, 283 insertions(+), 240 deletions(-) diff --git a/packages/system/cozystack-api/templates/rbac.yaml b/packages/system/cozystack-api/templates/rbac.yaml index e4b3aca9..0429b9ef 100644 --- a/packages/system/cozystack-api/templates/rbac.yaml +++ b/packages/system/cozystack-api/templates/rbac.yaml @@ -4,7 +4,7 @@ metadata: name: cozystack-api rules: - apiGroups: [""] - resources: ["namespaces", "secrets"] + resources: ["namespaces", "secrets", "services"] verbs: ["get", "watch", "list"] - apiGroups: ["rbac.authorization.k8s.io"] resources: ["rolebindings"] diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index e829d336..d5814ee7 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -17,18 +17,22 @@ limitations under the License. package apiserver import ( + "context" "fmt" + "time" helmv2 "github.com/fluxcd/helm-controller/api/v2" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/cozystack/cozystack/pkg/apis/apps" appsinstall "github.com/cozystack/cozystack/pkg/apis/apps/install" @@ -50,6 +54,7 @@ var ( // versions and content types. Codecs = serializer.NewCodecFactory(Scheme) CozyComponentName = "cozy" + syncPeriod = 5 * time.Minute ) func init() { @@ -58,9 +63,15 @@ func init() { // Register HelmRelease types. if err := helmv2.AddToScheme(Scheme); err != nil { - panic(fmt.Sprintf("Failed to add HelmRelease types to scheme: %v", err)) + panic(fmt.Errorf("Failed to add HelmRelease types to scheme: %w", err)) } + if err := corev1.AddToScheme(Scheme); err != nil { + panic(fmt.Errorf("Failed to add core types to scheme: %w", err)) + } + if err := rbacv1.AddToScheme(Scheme); err != nil { + panic(fmt.Errorf("Failed to add RBAC types to scheme: %w", err)) + } // Add unversioned types. metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"}) @@ -118,43 +129,59 @@ func (c completedConfig) New() (*CozyServer, error) { } // Create a dynamic client for HelmRelease using InClusterConfig. - inClusterConfig, err := restclient.InClusterConfig() + cfg, err := ctrl.GetConfig() if err != nil { - return nil, fmt.Errorf("unable to get in-cluster config: %v", err) + return nil, fmt.Errorf("failed to get kubeconfig: %w", err) } - dynamicClient, err := dynamic.NewForConfig(inClusterConfig) + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: Scheme, + Cache: cache.Options{SyncPeriod: &syncPeriod}, + }) if err != nil { - return nil, fmt.Errorf("unable to create dynamic client: %v", err) + return nil, fmt.Errorf("failed to build manager: %w", err) } - clientset, err := kubernetes.NewForConfig(inClusterConfig) - if err != nil { - return nil, fmt.Errorf("create kube clientset: %v", err) + ctx := ctrl.SetupSignalHandler() + + if err = mustGetInformers(ctx, mgr, + &helmv2.HelmRelease{}, + &corev1.Secret{}, + &corev1.Namespace{}, + &corev1.Service{}, + &rbacv1.RoleBinding{}, + ); err != nil { + return nil, fmt.Errorf("failed to get informers: %w", err) } + go func() { + if err := mgr.Start(ctx); err != nil { + panic(fmt.Errorf("manager start failed: %w", err)) + } + }() + + if ok := mgr.GetCache().WaitForCacheSync(ctx); !ok { + return nil, fmt.Errorf("cache sync failed") + } + + cli := mgr.GetClient() + watchCli, err := client.NewWithWatch(cfg, client.Options{Scheme: Scheme}) + if err != nil { + return nil, fmt.Errorf("failed to build watch client: %w", err) + } // --- static, cluster-scoped resource for core group --- coreV1alpha1Storage := map[string]rest.Storage{} coreV1alpha1Storage["tenantnamespaces"] = cozyregistry.RESTInPeace( - tenantnamespacestorage.NewREST( - clientset.CoreV1(), - clientset.RbacV1(), - ), + tenantnamespacestorage.NewREST(cli, watchCli), ) coreV1alpha1Storage["tenantsecrets"] = cozyregistry.RESTInPeace( - tenantsecretstorage.NewREST( - clientset.CoreV1(), - ), + tenantsecretstorage.NewREST(cli, watchCli), ) coreV1alpha1Storage["tenantsecretstables"] = cozyregistry.RESTInPeace( - tenantsecretstablestorage.NewREST( - clientset.CoreV1(), - ), + tenantsecretstablestorage.NewREST(cli, watchCli), ) coreV1alpha1Storage["tenantmodules"] = cozyregistry.RESTInPeace( - tenantmodulestorage.NewREST( - dynamicClient, - ), + tenantmodulestorage.NewREST(cli, watchCli), ) coreApiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(core.GroupName, Scheme, metav1.ParameterCodec, Codecs) @@ -166,7 +193,7 @@ func (c completedConfig) New() (*CozyServer, error) { // --- dynamically-configured, per-tenant resources --- appsV1alpha1Storage := map[string]rest.Storage{} for _, resConfig := range c.ResourceConfig.Resources { - storage := applicationstorage.NewREST(dynamicClient, &resConfig) + storage := applicationstorage.NewREST(cli, watchCli, &resConfig) appsV1alpha1Storage[resConfig.Application.Plural] = cozyregistry.RESTInPeace(storage) } appsApiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, Scheme, metav1.ParameterCodec, Codecs) @@ -177,3 +204,12 @@ func (c completedConfig) New() (*CozyServer, error) { return s, nil } + +func mustGetInformers(ctx context.Context, mgr ctrl.Manager, types ...client.Object) error { + for i := range types { + if _, err := mgr.GetCache().GetInformer(ctx, types[i]); err != nil { + return fmt.Errorf("failed to get informer for %T: %w", types[i], err) + } + } + return nil +} diff --git a/pkg/registry/apps/application/rest.go b/pkg/registry/apps/application/rest.go index 388d741f..37a5d9a5 100644 --- a/pkg/registry/apps/application/rest.go +++ b/pkg/registry/apps/application/rest.go @@ -37,8 +37,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/client-go/dynamic" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" appsv1alpha1 "github.com/cozystack/cozystack/pkg/apis/apps/v1alpha1" "github.com/cozystack/cozystack/pkg/config" @@ -76,7 +76,8 @@ var helmReleaseGVR = schema.GroupVersionResource{ // REST implements the RESTStorage interface for Application resources type REST struct { - dynamicClient dynamic.Interface + c client.Client + w client.WithWatch gvr schema.GroupVersionResource gvk schema.GroupVersionKind kindName string @@ -86,7 +87,7 @@ type REST struct { } // NewREST creates a new REST storage for Application with specific configuration -func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST { +func NewREST(c client.Client, w client.WithWatch, config *config.Resource) *REST { var specSchema *structuralschema.Structural if raw := strings.TrimSpace(config.Application.OpenAPISchema); raw != "" { @@ -110,7 +111,8 @@ func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST { } return &REST{ - dynamicClient: dynamicClient, + c: c, + w: w, gvr: schema.GroupVersionResource{ Group: appsv1alpha1.GroupName, Version: "v1alpha1", @@ -158,30 +160,23 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix)) // Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined - // Convert HelmRelease to unstructured format - unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease) - if err != nil { - klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err) - return nil, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err) - } - klog.V(6).Infof("Creating HelmRelease %s in namespace %s", helmRelease.Name, app.Namespace) // Create HelmRelease in Kubernetes - createdHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(app.Namespace).Create(ctx, &unstructured.Unstructured{Object: unstructuredHR}, *options) + err = r.c.Create(ctx, helmRelease, &client.CreateOptions{Raw: options}) if err != nil { klog.Errorf("Failed to create HelmRelease %s: %v", helmRelease.Name, err) return nil, fmt.Errorf("failed to create HelmRelease: %v", err) } // Convert the created HelmRelease back to Application - convertedApp, err := r.ConvertHelmReleaseToApplication(createdHR) + convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease) if err != nil { - klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", createdHR.GetName(), err) + klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", helmRelease.GetName(), err) return nil, fmt.Errorf("conversion error: %v", err) } - klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", createdHR.GetName()) + klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", helmRelease.GetName()) // Convert Application to unstructured format unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp) @@ -206,7 +201,8 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) // Get the corresponding HelmRelease using the new prefix helmReleaseName := r.releaseConfig.Prefix + name - hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, *options) + helmRelease := &helmv2.HelmRelease{} + err = r.c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: helmReleaseName}, helmRelease, &client.GetOptions{Raw: options}) if err != nil { klog.Errorf("Error retrieving HelmRelease for resource %s: %v", name, err) @@ -221,14 +217,14 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) } // Check if HelmRelease meets the required chartName and sourceRef criteria - if !r.shouldIncludeHelmRelease(hr) { + if !r.shouldIncludeHelmRelease(helmRelease) { klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName) // Return a NotFound error for the Application resource return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name) } // Convert HelmRelease to Application - convertedApp, err := r.ConvertHelmReleaseToApplication(hr) + convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease) if err != nil { klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", name, err) return nil, fmt.Errorf("conversion error: %v", err) @@ -325,7 +321,11 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption } // List HelmReleases with mapped selectors - hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions) + hrList := &helmv2.HelmReleaseList{} + err = r.c.List(ctx, hrList, &client.ListOptions{ + Namespace: namespace, + Raw: &metaOptions, + }) if err != nil { klog.Errorf("Error listing HelmReleases: %v", err) return nil, err @@ -335,14 +335,14 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption items := make([]unstructured.Unstructured, 0) // Iterate over HelmReleases and convert to Applications - for _, hr := range hrList.Items { - if !r.shouldIncludeHelmRelease(&hr) { + for i := range hrList.Items { + if !r.shouldIncludeHelmRelease(&hrList.Items[i]) { continue } - app, err := r.ConvertHelmReleaseToApplication(&hr) + app, err := r.ConvertHelmReleaseToApplication(&hrList.Items[i]) if err != nil { - klog.Errorf("Error converting HelmRelease %s to Application: %v", hr.GetName(), err) + klog.Errorf("Error converting HelmRelease %s to Application: %v", hrList.Items[i].GetName(), err) continue } @@ -457,7 +457,8 @@ func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObje // Ensure ResourceVersion if helmRelease.ResourceVersion == "" { - cur, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Get(ctx, helmRelease.Name, metav1.GetOptions{}) + cur := &helmv2.HelmRelease{} + err := r.c.Get(ctx, client.ObjectKey{Namespace: helmRelease.Namespace, Name: helmRelease.Name}, cur, &client.GetOptions{Raw: &metav1.GetOptions{}}) if err != nil { return nil, false, fmt.Errorf("failed to fetch current HelmRelease: %w", err) } @@ -470,53 +471,38 @@ func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObje helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix)) // Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined - // Convert HelmRelease to unstructured format - unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease) - if err != nil { - klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err) - return nil, false, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err) - } - - // Retrieve metadata from unstructured object - metadata, found, err := unstructured.NestedMap(unstructuredHR, "metadata") - if err != nil || !found { - klog.Errorf("Failed to retrieve metadata from HelmRelease: %v, found: %v", err, found) - return nil, false, fmt.Errorf("failed to retrieve metadata from HelmRelease: %v", err) - } - klog.V(6).Infof("HelmRelease Metadata: %+v", metadata) - klog.V(6).Infof("Updating HelmRelease %s in namespace %s", helmRelease.Name, helmRelease.Namespace) // Before updating, ensure the HelmRelease meets the inclusion criteria // This prevents updating HelmReleases that should not be managed as Applications - if !r.shouldIncludeHelmRelease(&unstructured.Unstructured{Object: unstructuredHR}) { + if !r.shouldIncludeHelmRelease(helmRelease) { klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.Name) // Return a NotFound error for the Application resource return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name) } // Update the HelmRelease in Kubernetes - resultHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Update(ctx, &unstructured.Unstructured{Object: unstructuredHR}, metav1.UpdateOptions{}) + err = r.c.Update(ctx, helmRelease, &client.UpdateOptions{Raw: &metav1.UpdateOptions{}}) if err != nil { klog.Errorf("Failed to update HelmRelease %s: %v", helmRelease.Name, err) return nil, false, fmt.Errorf("failed to update HelmRelease: %v", err) } // After updating, ensure the updated HelmRelease still meets the inclusion criteria - if !r.shouldIncludeHelmRelease(resultHR) { - klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", resultHR.GetName()) + if !r.shouldIncludeHelmRelease(helmRelease) { + klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.GetName()) // Return a NotFound error for the Application resource return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name) } // Convert the updated HelmRelease back to Application - convertedApp, err := r.ConvertHelmReleaseToApplication(resultHR) + convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease) if err != nil { - klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", resultHR.GetName(), err) + klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", helmRelease.GetName(), err) return nil, false, fmt.Errorf("conversion error: %v", err) } - klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", resultHR.GetName()) + klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", helmRelease.GetName()) // Explicitly set apiVersion and kind for Application convertedApp.TypeMeta = metav1.TypeMeta{ @@ -554,7 +540,8 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va helmReleaseName := r.releaseConfig.Prefix + name // Retrieve the HelmRelease before attempting to delete - hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, metav1.GetOptions{}) + helmRelease := &helmv2.HelmRelease{} + err = r.c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: helmReleaseName}, helmRelease, &client.GetOptions{Raw: &metav1.GetOptions{}}) if err != nil { if apierrors.IsNotFound(err) { // If HelmRelease does not exist, return NotFound error for Application @@ -567,7 +554,7 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va } // Validate that the HelmRelease meets the inclusion criteria - if !r.shouldIncludeHelmRelease(hr) { + if !r.shouldIncludeHelmRelease(helmRelease) { klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName) // Return NotFound error for Application resource return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name) @@ -576,7 +563,7 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va klog.V(6).Infof("Deleting HelmRelease %s in namespace %s", helmReleaseName, namespace) // Delete the HelmRelease corresponding to the Application - err = r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Delete(ctx, helmReleaseName, *options) + err = r.c.Delete(ctx, helmRelease, &client.DeleteOptions{Raw: options}) if err != nil { klog.Errorf("Failed to delete HelmRelease %s: %v", helmReleaseName, err) return nil, false, fmt.Errorf("failed to delete HelmRelease: %v", err) @@ -659,7 +646,11 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio } // Start watch on HelmRelease with mapped selectors - helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions) + hrList := &helmv2.HelmReleaseList{} + helmWatcher, err := r.w.Watch(ctx, hrList, &client.ListOptions{ + Namespace: namespace, + Raw: &metaOptions, + }) if err != nil { klog.Errorf("Error setting up watch for HelmReleases: %v", err) return nil, err @@ -669,13 +660,15 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio customW := &customWatcher{ resultChan: make(chan watch.Event), stopChan: make(chan struct{}), + underlying: helmWatcher, } go func() { defer close(customW.resultChan) + defer customW.underlying.Stop() for { select { - case event, ok := <-helmWatcher.ResultChan(): + case event, ok := <-customW.underlying.ResultChan(): if !ok { // The watcher has been closed, attempt to re-establish the watch klog.Warning("HelmRelease watcher closed, attempting to re-establish") @@ -689,19 +682,19 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio continue // Skip processing this event } - // Proceed with processing Unstructured objects - matches, err := r.isRelevantHelmRelease(&event) - if err != nil { - klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err) + // Proceed with processing HelmRelease objects + hr, ok := event.Object.(*helmv2.HelmRelease) + if !ok { + klog.V(4).Infof("Expected HelmRelease object, got %T", event.Object) continue } - if !matches { + if !r.shouldIncludeHelmRelease(hr) { continue } // Convert HelmRelease to Application - app, err := r.ConvertHelmReleaseToApplication(event.Object.(*unstructured.Unstructured)) + app, err := r.ConvertHelmReleaseToApplication(hr) if err != nil { klog.Errorf("Error converting HelmRelease to Application: %v", err) continue @@ -771,12 +764,16 @@ type customWatcher struct { resultChan chan watch.Event stopChan chan struct{} stopOnce sync.Once + underlying watch.Interface } // Stop terminates the watch func (cw *customWatcher) Stop() { cw.stopOnce.Do(func() { close(cw.stopChan) + if cw.underlying != nil { + cw.underlying.Stop() + } }) } @@ -785,34 +782,18 @@ func (cw *customWatcher) ResultChan() <-chan watch.Event { return cw.resultChan } -// isRelevantHelmRelease checks if the HelmRelease meets the sourceRef and prefix criteria -func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) { - if event.Object == nil { - return false, nil - } - - // Check if the object is a *v1.Status - if status, ok := event.Object.(*metav1.Status); ok { - // Log at a less severe level or handle specific status errors if needed - klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message) - return false, nil // Not relevant for processing as a HelmRelease - } - - // Proceed if it's an Unstructured object - hr, ok := event.Object.(*unstructured.Unstructured) - if !ok { - return false, fmt.Errorf("expected Unstructured object, got %T", event.Object) - } - - return r.shouldIncludeHelmRelease(hr), nil -} - // shouldIncludeHelmRelease determines if a HelmRelease should be included based on filtering criteria -func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool { +func (r *REST) shouldIncludeHelmRelease(hr *helmv2.HelmRelease) bool { + // Nil check for Chart field + if hr.Spec.Chart == nil { + klog.V(6).Infof("HelmRelease %s has nil spec.chart field", hr.GetName()) + return false + } + // Filter by Chart Name - chartName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "chart") - if err != nil || !found { - klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field: %v", hr.GetName(), err) + chartName := hr.Spec.Chart.Spec.Chart + if chartName == "" { + klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field", hr.GetName()) return false } if chartName != r.releaseConfig.Chart.Name { @@ -825,21 +806,29 @@ func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool { } // matchesSourceRefAndPrefix checks both SourceRefConfig and Prefix criteria -func (r *REST) matchesSourceRefAndPrefix(hr *unstructured.Unstructured) bool { +func (r *REST) matchesSourceRefAndPrefix(hr *helmv2.HelmRelease) bool { + // Nil check for Chart field (defensive) + if hr.Spec.Chart == nil { + klog.V(6).Infof("HelmRelease %s has nil spec.chart field", hr.GetName()) + return false + } + // Extract SourceRef fields - sourceRefKind, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "kind") - if err != nil || !found { - klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field: %v", hr.GetName(), err) + sourceRef := hr.Spec.Chart.Spec.SourceRef + sourceRefKind := sourceRef.Kind + sourceRefName := sourceRef.Name + sourceRefNamespace := sourceRef.Namespace + + if sourceRefKind == "" { + klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field", hr.GetName()) return false } - sourceRefName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "name") - if err != nil || !found { - klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field: %v", hr.GetName(), err) + if sourceRefName == "" { + klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field", hr.GetName()) return false } - sourceRefNamespace, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "namespace") - if err != nil || !found { - klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field: %v", hr.GetName(), err) + if sourceRefNamespace == "" { + klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field", hr.GetName()) return false } @@ -930,19 +919,11 @@ func filterPrefixedMap(original map[string]string, prefix string) map[string]str } // ConvertHelmReleaseToApplication converts a HelmRelease to an Application -func (r *REST) ConvertHelmReleaseToApplication(hr *unstructured.Unstructured) (appsv1alpha1.Application, error) { +func (r *REST) ConvertHelmReleaseToApplication(hr *helmv2.HelmRelease) (appsv1alpha1.Application, error) { klog.V(6).Infof("Converting HelmRelease to Application for resource %s", hr.GetName()) - var helmRelease helmv2.HelmRelease - // Convert unstructured to HelmRelease struct - err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease) - if err != nil { - klog.Errorf("Error converting from unstructured to HelmRelease: %v", err) - return appsv1alpha1.Application{}, err - } - // Convert HelmRelease struct to Application struct - app, err := r.convertHelmReleaseToApplication(&helmRelease) + app, err := r.convertHelmReleaseToApplication(hr) if err != nil { klog.Errorf("Error converting from HelmRelease to Application: %v", err) return appsv1alpha1.Application{}, err diff --git a/pkg/registry/core/tenantmodule/rest.go b/pkg/registry/core/tenantmodule/rest.go index aa7d4eeb..852a0b64 100644 --- a/pkg/registry/core/tenantmodule/rest.go +++ b/pkg/registry/core/tenantmodule/rest.go @@ -32,12 +32,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/duration" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/client-go/dynamic" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -69,17 +70,19 @@ var helmReleaseGVR = schema.GroupVersionResource{ // REST implements the RESTStorage interface for TenantModule resources type REST struct { - dynamicClient dynamic.Interface - gvr schema.GroupVersionResource - gvk schema.GroupVersionKind - kindName string - singularName string + c client.Client + w client.WithWatch + gvr schema.GroupVersionResource + gvk schema.GroupVersionKind + kindName string + singularName string } // NewREST creates a new REST storage for TenantModule -func NewREST(dynamicClient dynamic.Interface) *REST { +func NewREST(c client.Client, w client.WithWatch) *REST { return &REST{ - dynamicClient: dynamicClient, + c: c, + w: w, gvr: schema.GroupVersionResource{ Group: corev1alpha1.GroupName, Version: "v1alpha1", @@ -115,7 +118,8 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) klog.V(6).Infof("Attempting to retrieve TenantModule %s in namespace %s", name, namespace) // Get the corresponding HelmRelease - hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, name, *options) + hr := &helmv2.HelmRelease{} + err = r.c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, hr, &client.GetOptions{Raw: options}) if err != nil { klog.Errorf("Error retrieving HelmRelease for TenantModule %s: %v", name, err) @@ -231,7 +235,11 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption } // List HelmReleases with mapped selectors - hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions) + hrList := &helmv2.HelmReleaseList{} + err = r.c.List(ctx, hrList, &client.ListOptions{ + Namespace: namespace, + Raw: &metaOptions, + }) if err != nil { klog.Errorf("Error listing HelmReleases: %v", err) return nil, err @@ -241,15 +249,15 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption items := make([]unstructured.Unstructured, 0) // Iterate over HelmReleases and convert to TenantModules - for _, hr := range hrList.Items { + for i := range hrList.Items { // Double-check the label requirement - if !r.hasTenantModuleLabel(&hr) { + if !r.hasTenantModuleLabel(&hrList.Items[i]) { continue } - module, err := r.ConvertHelmReleaseToTenantModule(&hr) + module, err := r.ConvertHelmReleaseToTenantModule(&hrList.Items[i]) if err != nil { - klog.Errorf("Error converting HelmRelease %s to TenantModule: %v", hr.GetName(), err) + klog.Errorf("Error converting HelmRelease %s to TenantModule: %v", hrList.Items[i].GetName(), err) continue } @@ -376,7 +384,11 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio } // Start watch on HelmRelease with mapped selectors - helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions) + hrList := &helmv2.HelmReleaseList{} + helmWatcher, err := r.w.Watch(ctx, hrList, &client.ListOptions{ + Namespace: namespace, + Raw: &metaOptions, + }) if err != nil { klog.Errorf("Error setting up watch for HelmReleases: %v", err) return nil, err @@ -386,13 +398,15 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio customW := &customWatcher{ resultChan: make(chan watch.Event), stopChan: make(chan struct{}), + underlying: helmWatcher, } go func() { defer close(customW.resultChan) + defer customW.underlying.Stop() for { select { - case event, ok := <-helmWatcher.ResultChan(): + case event, ok := <-customW.underlying.ResultChan(): if !ok { // The watcher has been closed, attempt to re-establish the watch klog.Warning("HelmRelease watcher closed, attempting to re-establish") @@ -406,19 +420,19 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio continue // Skip processing this event } - // Proceed with processing Unstructured objects - matches, err := r.isRelevantHelmRelease(&event) - if err != nil { - klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err) + // Proceed with processing HelmRelease objects + hr, ok := event.Object.(*helmv2.HelmRelease) + if !ok { + klog.V(4).Infof("Expected HelmRelease object, got %T", event.Object) continue } - if !matches { + if !r.hasTenantModuleLabel(hr) { continue } // Convert HelmRelease to TenantModule - module, err := r.ConvertHelmReleaseToTenantModule(event.Object.(*unstructured.Unstructured)) + module, err := r.ConvertHelmReleaseToTenantModule(hr) if err != nil { klog.Errorf("Error converting HelmRelease to TenantModule: %v", err) continue @@ -480,12 +494,16 @@ type customWatcher struct { resultChan chan watch.Event stopChan chan struct{} stopOnce sync.Once + underlying watch.Interface } // Stop terminates the watch func (cw *customWatcher) Stop() { cw.stopOnce.Do(func() { close(cw.stopChan) + if cw.underlying != nil { + cw.underlying.Stop() + } }) } @@ -494,30 +512,8 @@ func (cw *customWatcher) ResultChan() <-chan watch.Event { return cw.resultChan } -// isRelevantHelmRelease checks if the HelmRelease has the tenant module label -func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) { - if event.Object == nil { - return false, nil - } - - // Check if the object is a *v1.Status - if status, ok := event.Object.(*metav1.Status); ok { - // Log at a less severe level or handle specific status errors if needed - klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message) - return false, nil // Not relevant for processing as a HelmRelease - } - - // Proceed if it's an Unstructured object - hr, ok := event.Object.(*unstructured.Unstructured) - if !ok { - return false, fmt.Errorf("expected Unstructured object, got %T", event.Object) - } - - return r.hasTenantModuleLabel(hr), nil -} - // hasTenantModuleLabel checks if a HelmRelease has the required tenant module label -func (r *REST) hasTenantModuleLabel(hr *unstructured.Unstructured) bool { +func (r *REST) hasTenantModuleLabel(hr *helmv2.HelmRelease) bool { labels := hr.GetLabels() if labels == nil { return false @@ -554,19 +550,11 @@ func (r *REST) getNamespace(ctx context.Context) (string, error) { } // ConvertHelmReleaseToTenantModule converts a HelmRelease to a TenantModule -func (r *REST) ConvertHelmReleaseToTenantModule(hr *unstructured.Unstructured) (corev1alpha1.TenantModule, error) { +func (r *REST) ConvertHelmReleaseToTenantModule(hr *helmv2.HelmRelease) (corev1alpha1.TenantModule, error) { klog.V(6).Infof("Converting HelmRelease to TenantModule for resource %s", hr.GetName()) - var helmRelease helmv2.HelmRelease - // Convert unstructured to HelmRelease struct - err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease) - if err != nil { - klog.Errorf("Error converting from unstructured to HelmRelease: %v", err) - return corev1alpha1.TenantModule{}, err - } - // Convert HelmRelease struct to TenantModule struct - module, err := r.convertHelmReleaseToTenantModule(&helmRelease) + module, err := r.convertHelmReleaseToTenantModule(hr) if err != nil { klog.Errorf("Error converting from HelmRelease to TenantModule: %v", err) return corev1alpha1.TenantModule{}, err diff --git a/pkg/registry/core/tenantnamespace/rest.go b/pkg/registry/core/tenantnamespace/rest.go index 56098740..68d9fe3c 100644 --- a/pkg/registry/core/tenantnamespace/rest.go +++ b/pkg/registry/core/tenantnamespace/rest.go @@ -12,17 +12,18 @@ import ( "time" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metainternal "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/duration" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - rbacv1client "k8s.io/client-go/kubernetes/typed/rbac/v1" + "sigs.k8s.io/controller-runtime/pkg/client" corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1" ) @@ -46,18 +47,18 @@ var ( ) type REST struct { - core corev1client.CoreV1Interface - rbac rbacv1client.RbacV1Interface - gvr schema.GroupVersionResource + c client.Client + w client.WithWatch + gvr schema.GroupVersionResource } func NewREST( - coreCli corev1client.CoreV1Interface, - rbacCli rbacv1client.RbacV1Interface, + c client.Client, + w client.WithWatch, ) *REST { return &REST{ - core: coreCli, - rbac: rbacCli, + c: c, + w: w, gvr: schema.GroupVersionResource{ Group: corev1alpha1.GroupName, Version: "v1alpha1", @@ -89,7 +90,8 @@ func (r *REST) List( ctx context.Context, _ *metainternal.ListOptions, ) (runtime.Object, error) { - nsList, err := r.core.Namespaces().List(ctx, metav1.ListOptions{}) + nsList := &corev1.NamespaceList{} + err := r.c.List(ctx, nsList) if err != nil { return nil, err } @@ -118,7 +120,8 @@ func (r *REST) Get( return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name) } - ns, err := r.core.Namespaces().Get(ctx, name, *opts) + ns := &corev1.Namespace{} + err := r.c.Get(ctx, types.NamespacedName{Namespace: "", Name: name}, ns, &client.GetOptions{Raw: opts}) if err != nil { return nil, err } @@ -128,14 +131,7 @@ func (r *REST) Get( APIVersion: corev1alpha1.SchemeGroupVersion.String(), Kind: "TenantNamespace", }, - ObjectMeta: metav1.ObjectMeta{ - Name: ns.Name, - UID: ns.UID, - ResourceVersion: ns.ResourceVersion, - CreationTimestamp: ns.CreationTimestamp, - Labels: ns.Labels, - Annotations: ns.Annotations, - }, + ObjectMeta: ns.ObjectMeta, }, nil } @@ -144,10 +140,11 @@ func (r *REST) Get( // ----------------------------------------------------------------------------- func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch.Interface, error) { - nsWatch, err := r.core.Namespaces().Watch(ctx, metav1.ListOptions{ + nsList := &corev1.NamespaceList{} + nsWatch, err := r.w.Watch(ctx, nsList, &client.ListOptions{Raw: &metav1.ListOptions{ Watch: true, ResourceVersion: opts.ResourceVersion, - }) + }}) if err != nil { return nil, err } @@ -282,9 +279,10 @@ func (r *REST) filterAccessible( for _, name := range names { nameSet[name] = struct{}{} } - rbs, err := r.rbac.RoleBindings("").List(ctx, metav1.ListOptions{}) + rbs := &rbacv1.RoleBindingList{} + err := r.c.List(ctx, rbs) if err != nil { - return []string{}, fmt.Errorf("failed to list rolebindings") + return []string{}, fmt.Errorf("failed to list rolebindings: %w", err) } allowedNameSet := make(map[string]struct{}) for i := range rbs.Items { diff --git a/pkg/registry/core/tenantsecret/rest.go b/pkg/registry/core/tenantsecret/rest.go index ad477527..a13426e6 100644 --- a/pkg/registry/core/tenantsecret/rest.go +++ b/pkg/registry/core/tenantsecret/rest.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1" ) @@ -157,13 +157,15 @@ var ( ) type REST struct { - core corev1client.CoreV1Interface - gvr schema.GroupVersionResource + c client.Client + w client.WithWatch + gvr schema.GroupVersionResource } -func NewREST(coreCli corev1client.CoreV1Interface) *REST { +func NewREST(c client.Client, w client.WithWatch) *REST { return &REST{ - core: coreCli, + c: c, + w: w, gvr: schema.GroupVersionResource{ Group: corev1alpha1.GroupName, Version: "v1alpha1", @@ -203,11 +205,11 @@ func (r *REST) Create( } sec := tenantToSecret(in, nil) - out, err := r.core.Secrets(sec.Namespace).Create(ctx, sec, *opts) + err := r.c.Create(ctx, sec, &client.CreateOptions{Raw: opts}) if err != nil { return nil, err } - return secretToTenant(out), nil + return secretToTenant(sec), nil } func (r *REST) Get( @@ -219,7 +221,8 @@ func (r *REST) Get( if err != nil { return nil, err } - sec, err := r.core.Secrets(ns).Get(ctx, name, *opts) + sec := &corev1.Secret{} + err = r.c.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, sec, &client.GetOptions{Raw: opts}) if err != nil { return nil, err } @@ -247,10 +250,14 @@ func (r *REST) List(ctx context.Context, opts *metainternal.ListOptions) (runtim fieldSel = opts.FieldSelector.String() } - list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{ - LabelSelector: ls.String(), - FieldSelector: fieldSel, - }) + list := &corev1.SecretList{} + err = r.c.List(ctx, list, + &client.ListOptions{ + Namespace: ns, + Raw: &metav1.ListOptions{ + LabelSelector: ls.String(), + FieldSelector: fieldSel, + }}) if err != nil { return nil, err } @@ -284,7 +291,8 @@ func (r *REST) Update( return nil, false, err } - cur, err := r.core.Secrets(ns).Get(ctx, name, metav1.GetOptions{}) + cur := &corev1.Secret{} + err = r.c.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, cur, &client.GetOptions{Raw: &metav1.GetOptions{}}) if err != nil && !apierrors.IsNotFound(err) { return nil, false, err } @@ -296,17 +304,18 @@ func (r *REST) Update( in := newObj.(*corev1alpha1.TenantSecret) newSec := tenantToSecret(in, cur) + newSec.Namespace = ns if cur == nil { if !forceCreate && err == nil { return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name) } - out, err := r.core.Secrets(ns).Create(ctx, newSec, metav1.CreateOptions{}) - return secretToTenant(out), true, err + err := r.c.Create(ctx, newSec, &client.CreateOptions{Raw: &metav1.CreateOptions{}}) + return secretToTenant(newSec), true, err } newSec.ResourceVersion = cur.ResourceVersion - out, err := r.core.Secrets(ns).Update(ctx, newSec, *opts) - return secretToTenant(out), false, err + err = r.c.Update(ctx, newSec, &client.UpdateOptions{Raw: opts}) + return secretToTenant(newSec), false, err } func (r *REST) Delete( @@ -319,7 +328,7 @@ func (r *REST) Delete( if err != nil { return nil, false, err } - err = r.core.Secrets(ns).Delete(ctx, name, *opts) + err = r.c.Delete(ctx, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name}}, &client.DeleteOptions{Raw: opts}) return nil, err == nil, err } @@ -331,21 +340,33 @@ func (r *REST) Patch( opts *metav1.PatchOptions, subresources ...string, ) (runtime.Object, error) { + if len(subresources) > 0 { + return nil, fmt.Errorf("TenantSecret does not have subresources") + } ns, err := nsFrom(ctx) if err != nil { return nil, err } - - out, err := r.core.Secrets(ns). - Patch(ctx, name, pt, data, *opts, subresources...) + out := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + } + patch := client.RawPatch(pt, data) + err = r.c.Patch(ctx, out, patch, &client.PatchOptions{Raw: opts}) if err != nil { return nil, err } // Ensure tenant secret label is preserved + if out.Labels == nil { + out.Labels = make(map[string]string) + } + if out.Labels[tsLabelKey] != tsLabelValue { out.Labels[tsLabelKey] = tsLabelValue - out, _ = r.core.Secrets(ns).Update(ctx, out, metav1.UpdateOptions{}) + _ = r.c.Update(ctx, out, &client.UpdateOptions{Raw: &metav1.UpdateOptions{}}) } return secretToTenant(out), nil @@ -361,12 +382,13 @@ func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch return nil, err } + secList := &corev1.SecretList{} ls := labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String() - base, err := r.core.Secrets(ns).Watch(ctx, metav1.ListOptions{ + base, err := r.w.Watch(ctx, secList, &client.ListOptions{Namespace: ns, Raw: &metav1.ListOptions{ Watch: true, LabelSelector: ls, ResourceVersion: opts.ResourceVersion, - }) + }}) if err != nil { return nil, err } diff --git a/pkg/registry/core/tenantsecretstable/rest.go b/pkg/registry/core/tenantsecretstable/rest.go index 841bfa32..de2604ca 100644 --- a/pkg/registry/core/tenantsecretstable/rest.go +++ b/pkg/registry/core/tenantsecretstable/rest.go @@ -23,7 +23,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1" ) @@ -38,13 +38,15 @@ const ( ) type REST struct { - core corev1client.CoreV1Interface - gvr schema.GroupVersionResource + c client.Client + w client.WithWatch + gvr schema.GroupVersionResource } -func NewREST(coreCli corev1client.CoreV1Interface) *REST { +func NewREST(c client.Client, w client.WithWatch) *REST { return &REST{ - core: coreCli, + c: c, + w: w, gvr: schema.GroupVersionResource{ Group: corev1alpha1.GroupName, Version: "v1alpha1", @@ -95,7 +97,14 @@ func (r *REST) Get(ctx context.Context, name string, opts *metav1.GetOptions) (r // We need to identify secret name and key. Iterate secrets in namespace with tenant secret label // and return the matching composed object. - list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String()}) + list := &corev1.SecretList{} + err = r.c.List(ctx, list, + &client.ListOptions{ + Namespace: ns, + Raw: &metav1.ListOptions{ + LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String(), + }, + }) if err != nil { return nil, err } @@ -130,7 +139,15 @@ func (r *REST) List(ctx context.Context, opts *metainternal.ListOptions) (runtim fieldSel = opts.FieldSelector.String() } - list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{LabelSelector: sel.String(), FieldSelector: fieldSel}) + list := &corev1.SecretList{} + err = r.c.List(ctx, list, + &client.ListOptions{ + Namespace: ns, + Raw: &metav1.ListOptions{ + LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String(), + FieldSelector: fieldSel, + }, + }) if err != nil { return nil, err } @@ -169,12 +186,13 @@ func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch return nil, err } + secList := &corev1.SecretList{} ls := labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String() - base, err := r.core.Secrets(ns).Watch(ctx, metav1.ListOptions{ + base, err := r.w.Watch(ctx, secList, &client.ListOptions{Namespace: ns, Raw: &metav1.ListOptions{ Watch: true, LabelSelector: ls, ResourceVersion: opts.ResourceVersion, - }) + }}) if err != nil { return nil, err }