[api] Use shared informer cache (#1539)

## What this PR does

This patch changes all clients in the Cozystack API server to typed ones
from the controller runtime. This should improve the performance of the
API server and simplifies the code by removing work with unstructured
objects and dynamic clients.

### Release note

```release-note
[api] Use typed and cache-backed k8s clients in the Cozystack API to
improve performance. Get rid of operations on unstructured objects and
use of dynamic clients.
```

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Refactor**
* Backend migrated to a controller-runtime manager with typed clients
for Kubernetes resources, improving watch reliability and cache sync.
* Storage paths for applications, tenant modules, namespaces, and
secrets now use strongly-typed resource handling for more consistent
behavior.

* **Chores**
  * Cluster role expanded to include services in core API permissions.

* **Notes**
  * No user-facing API schema changes.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Timofei Larkin
2025-10-27 18:00:20 +04:00
committed by GitHub
7 changed files with 283 additions and 240 deletions

View File

@@ -4,7 +4,7 @@ metadata:
name: cozystack-api
rules:
- apiGroups: [""]
resources: ["namespaces", "secrets"]
resources: ["namespaces", "secrets", "services"]
verbs: ["get", "watch", "list"]
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["rolebindings"]

View File

@@ -17,18 +17,22 @@ limitations under the License.
package apiserver
import (
"context"
"fmt"
"time"
helmv2 "github.com/fluxcd/helm-controller/api/v2"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/cozystack/cozystack/pkg/apis/apps"
appsinstall "github.com/cozystack/cozystack/pkg/apis/apps/install"
@@ -50,6 +54,7 @@ var (
// versions and content types.
Codecs = serializer.NewCodecFactory(Scheme)
CozyComponentName = "cozy"
syncPeriod = 5 * time.Minute
)
func init() {
@@ -58,9 +63,15 @@ func init() {
// Register HelmRelease types.
if err := helmv2.AddToScheme(Scheme); err != nil {
panic(fmt.Sprintf("Failed to add HelmRelease types to scheme: %v", err))
panic(fmt.Errorf("Failed to add HelmRelease types to scheme: %w", err))
}
if err := corev1.AddToScheme(Scheme); err != nil {
panic(fmt.Errorf("Failed to add core types to scheme: %w", err))
}
if err := rbacv1.AddToScheme(Scheme); err != nil {
panic(fmt.Errorf("Failed to add RBAC types to scheme: %w", err))
}
// Add unversioned types.
metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
@@ -118,43 +129,59 @@ func (c completedConfig) New() (*CozyServer, error) {
}
// Create a dynamic client for HelmRelease using InClusterConfig.
inClusterConfig, err := restclient.InClusterConfig()
cfg, err := ctrl.GetConfig()
if err != nil {
return nil, fmt.Errorf("unable to get in-cluster config: %v", err)
return nil, fmt.Errorf("failed to get kubeconfig: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(inClusterConfig)
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: Scheme,
Cache: cache.Options{SyncPeriod: &syncPeriod},
})
if err != nil {
return nil, fmt.Errorf("unable to create dynamic client: %v", err)
return nil, fmt.Errorf("failed to build manager: %w", err)
}
clientset, err := kubernetes.NewForConfig(inClusterConfig)
if err != nil {
return nil, fmt.Errorf("create kube clientset: %v", err)
ctx := ctrl.SetupSignalHandler()
if err = mustGetInformers(ctx, mgr,
&helmv2.HelmRelease{},
&corev1.Secret{},
&corev1.Namespace{},
&corev1.Service{},
&rbacv1.RoleBinding{},
); err != nil {
return nil, fmt.Errorf("failed to get informers: %w", err)
}
go func() {
if err := mgr.Start(ctx); err != nil {
panic(fmt.Errorf("manager start failed: %w", err))
}
}()
if ok := mgr.GetCache().WaitForCacheSync(ctx); !ok {
return nil, fmt.Errorf("cache sync failed")
}
cli := mgr.GetClient()
watchCli, err := client.NewWithWatch(cfg, client.Options{Scheme: Scheme})
if err != nil {
return nil, fmt.Errorf("failed to build watch client: %w", err)
}
// --- static, cluster-scoped resource for core group ---
coreV1alpha1Storage := map[string]rest.Storage{}
coreV1alpha1Storage["tenantnamespaces"] = cozyregistry.RESTInPeace(
tenantnamespacestorage.NewREST(
clientset.CoreV1(),
clientset.RbacV1(),
),
tenantnamespacestorage.NewREST(cli, watchCli),
)
coreV1alpha1Storage["tenantsecrets"] = cozyregistry.RESTInPeace(
tenantsecretstorage.NewREST(
clientset.CoreV1(),
),
tenantsecretstorage.NewREST(cli, watchCli),
)
coreV1alpha1Storage["tenantsecretstables"] = cozyregistry.RESTInPeace(
tenantsecretstablestorage.NewREST(
clientset.CoreV1(),
),
tenantsecretstablestorage.NewREST(cli, watchCli),
)
coreV1alpha1Storage["tenantmodules"] = cozyregistry.RESTInPeace(
tenantmodulestorage.NewREST(
dynamicClient,
),
tenantmodulestorage.NewREST(cli, watchCli),
)
coreApiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(core.GroupName, Scheme, metav1.ParameterCodec, Codecs)
@@ -166,7 +193,7 @@ func (c completedConfig) New() (*CozyServer, error) {
// --- dynamically-configured, per-tenant resources ---
appsV1alpha1Storage := map[string]rest.Storage{}
for _, resConfig := range c.ResourceConfig.Resources {
storage := applicationstorage.NewREST(dynamicClient, &resConfig)
storage := applicationstorage.NewREST(cli, watchCli, &resConfig)
appsV1alpha1Storage[resConfig.Application.Plural] = cozyregistry.RESTInPeace(storage)
}
appsApiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, Scheme, metav1.ParameterCodec, Codecs)
@@ -177,3 +204,12 @@ func (c completedConfig) New() (*CozyServer, error) {
return s, nil
}
func mustGetInformers(ctx context.Context, mgr ctrl.Manager, types ...client.Object) error {
for i := range types {
if _, err := mgr.GetCache().GetInformer(ctx, types[i]); err != nil {
return fmt.Errorf("failed to get informer for %T: %w", types[i], err)
}
}
return nil
}

View File

@@ -37,8 +37,8 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
appsv1alpha1 "github.com/cozystack/cozystack/pkg/apis/apps/v1alpha1"
"github.com/cozystack/cozystack/pkg/config"
@@ -76,7 +76,8 @@ var helmReleaseGVR = schema.GroupVersionResource{
// REST implements the RESTStorage interface for Application resources
type REST struct {
dynamicClient dynamic.Interface
c client.Client
w client.WithWatch
gvr schema.GroupVersionResource
gvk schema.GroupVersionKind
kindName string
@@ -86,7 +87,7 @@ type REST struct {
}
// NewREST creates a new REST storage for Application with specific configuration
func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST {
func NewREST(c client.Client, w client.WithWatch, config *config.Resource) *REST {
var specSchema *structuralschema.Structural
if raw := strings.TrimSpace(config.Application.OpenAPISchema); raw != "" {
@@ -110,7 +111,8 @@ func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST {
}
return &REST{
dynamicClient: dynamicClient,
c: c,
w: w,
gvr: schema.GroupVersionResource{
Group: appsv1alpha1.GroupName,
Version: "v1alpha1",
@@ -158,30 +160,23 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation
helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined
// Convert HelmRelease to unstructured format
unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
if err != nil {
klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
return nil, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
}
klog.V(6).Infof("Creating HelmRelease %s in namespace %s", helmRelease.Name, app.Namespace)
// Create HelmRelease in Kubernetes
createdHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(app.Namespace).Create(ctx, &unstructured.Unstructured{Object: unstructuredHR}, *options)
err = r.c.Create(ctx, helmRelease, &client.CreateOptions{Raw: options})
if err != nil {
klog.Errorf("Failed to create HelmRelease %s: %v", helmRelease.Name, err)
return nil, fmt.Errorf("failed to create HelmRelease: %v", err)
}
// Convert the created HelmRelease back to Application
convertedApp, err := r.ConvertHelmReleaseToApplication(createdHR)
convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease)
if err != nil {
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", createdHR.GetName(), err)
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", helmRelease.GetName(), err)
return nil, fmt.Errorf("conversion error: %v", err)
}
klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", createdHR.GetName())
klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", helmRelease.GetName())
// Convert Application to unstructured format
unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
@@ -206,7 +201,8 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions)
// Get the corresponding HelmRelease using the new prefix
helmReleaseName := r.releaseConfig.Prefix + name
hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, *options)
helmRelease := &helmv2.HelmRelease{}
err = r.c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: helmReleaseName}, helmRelease, &client.GetOptions{Raw: options})
if err != nil {
klog.Errorf("Error retrieving HelmRelease for resource %s: %v", name, err)
@@ -221,14 +217,14 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions)
}
// Check if HelmRelease meets the required chartName and sourceRef criteria
if !r.shouldIncludeHelmRelease(hr) {
if !r.shouldIncludeHelmRelease(helmRelease) {
klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
// Return a NotFound error for the Application resource
return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
}
// Convert HelmRelease to Application
convertedApp, err := r.ConvertHelmReleaseToApplication(hr)
convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease)
if err != nil {
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", name, err)
return nil, fmt.Errorf("conversion error: %v", err)
@@ -325,7 +321,11 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption
}
// List HelmReleases with mapped selectors
hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions)
hrList := &helmv2.HelmReleaseList{}
err = r.c.List(ctx, hrList, &client.ListOptions{
Namespace: namespace,
Raw: &metaOptions,
})
if err != nil {
klog.Errorf("Error listing HelmReleases: %v", err)
return nil, err
@@ -335,14 +335,14 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption
items := make([]unstructured.Unstructured, 0)
// Iterate over HelmReleases and convert to Applications
for _, hr := range hrList.Items {
if !r.shouldIncludeHelmRelease(&hr) {
for i := range hrList.Items {
if !r.shouldIncludeHelmRelease(&hrList.Items[i]) {
continue
}
app, err := r.ConvertHelmReleaseToApplication(&hr)
app, err := r.ConvertHelmReleaseToApplication(&hrList.Items[i])
if err != nil {
klog.Errorf("Error converting HelmRelease %s to Application: %v", hr.GetName(), err)
klog.Errorf("Error converting HelmRelease %s to Application: %v", hrList.Items[i].GetName(), err)
continue
}
@@ -457,7 +457,8 @@ func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObje
// Ensure ResourceVersion
if helmRelease.ResourceVersion == "" {
cur, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Get(ctx, helmRelease.Name, metav1.GetOptions{})
cur := &helmv2.HelmRelease{}
err := r.c.Get(ctx, client.ObjectKey{Namespace: helmRelease.Namespace, Name: helmRelease.Name}, cur, &client.GetOptions{Raw: &metav1.GetOptions{}})
if err != nil {
return nil, false, fmt.Errorf("failed to fetch current HelmRelease: %w", err)
}
@@ -470,53 +471,38 @@ func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObje
helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined
// Convert HelmRelease to unstructured format
unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
if err != nil {
klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
return nil, false, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
}
// Retrieve metadata from unstructured object
metadata, found, err := unstructured.NestedMap(unstructuredHR, "metadata")
if err != nil || !found {
klog.Errorf("Failed to retrieve metadata from HelmRelease: %v, found: %v", err, found)
return nil, false, fmt.Errorf("failed to retrieve metadata from HelmRelease: %v", err)
}
klog.V(6).Infof("HelmRelease Metadata: %+v", metadata)
klog.V(6).Infof("Updating HelmRelease %s in namespace %s", helmRelease.Name, helmRelease.Namespace)
// Before updating, ensure the HelmRelease meets the inclusion criteria
// This prevents updating HelmReleases that should not be managed as Applications
if !r.shouldIncludeHelmRelease(&unstructured.Unstructured{Object: unstructuredHR}) {
if !r.shouldIncludeHelmRelease(helmRelease) {
klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.Name)
// Return a NotFound error for the Application resource
return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
}
// Update the HelmRelease in Kubernetes
resultHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Update(ctx, &unstructured.Unstructured{Object: unstructuredHR}, metav1.UpdateOptions{})
err = r.c.Update(ctx, helmRelease, &client.UpdateOptions{Raw: &metav1.UpdateOptions{}})
if err != nil {
klog.Errorf("Failed to update HelmRelease %s: %v", helmRelease.Name, err)
return nil, false, fmt.Errorf("failed to update HelmRelease: %v", err)
}
// After updating, ensure the updated HelmRelease still meets the inclusion criteria
if !r.shouldIncludeHelmRelease(resultHR) {
klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", resultHR.GetName())
if !r.shouldIncludeHelmRelease(helmRelease) {
klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.GetName())
// Return a NotFound error for the Application resource
return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
}
// Convert the updated HelmRelease back to Application
convertedApp, err := r.ConvertHelmReleaseToApplication(resultHR)
convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease)
if err != nil {
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", resultHR.GetName(), err)
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", helmRelease.GetName(), err)
return nil, false, fmt.Errorf("conversion error: %v", err)
}
klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", resultHR.GetName())
klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", helmRelease.GetName())
// Explicitly set apiVersion and kind for Application
convertedApp.TypeMeta = metav1.TypeMeta{
@@ -554,7 +540,8 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va
helmReleaseName := r.releaseConfig.Prefix + name
// Retrieve the HelmRelease before attempting to delete
hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, metav1.GetOptions{})
helmRelease := &helmv2.HelmRelease{}
err = r.c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: helmReleaseName}, helmRelease, &client.GetOptions{Raw: &metav1.GetOptions{}})
if err != nil {
if apierrors.IsNotFound(err) {
// If HelmRelease does not exist, return NotFound error for Application
@@ -567,7 +554,7 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va
}
// Validate that the HelmRelease meets the inclusion criteria
if !r.shouldIncludeHelmRelease(hr) {
if !r.shouldIncludeHelmRelease(helmRelease) {
klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
// Return NotFound error for Application resource
return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
@@ -576,7 +563,7 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va
klog.V(6).Infof("Deleting HelmRelease %s in namespace %s", helmReleaseName, namespace)
// Delete the HelmRelease corresponding to the Application
err = r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Delete(ctx, helmReleaseName, *options)
err = r.c.Delete(ctx, helmRelease, &client.DeleteOptions{Raw: options})
if err != nil {
klog.Errorf("Failed to delete HelmRelease %s: %v", helmReleaseName, err)
return nil, false, fmt.Errorf("failed to delete HelmRelease: %v", err)
@@ -659,7 +646,11 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
}
// Start watch on HelmRelease with mapped selectors
helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions)
hrList := &helmv2.HelmReleaseList{}
helmWatcher, err := r.w.Watch(ctx, hrList, &client.ListOptions{
Namespace: namespace,
Raw: &metaOptions,
})
if err != nil {
klog.Errorf("Error setting up watch for HelmReleases: %v", err)
return nil, err
@@ -669,13 +660,15 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
customW := &customWatcher{
resultChan: make(chan watch.Event),
stopChan: make(chan struct{}),
underlying: helmWatcher,
}
go func() {
defer close(customW.resultChan)
defer customW.underlying.Stop()
for {
select {
case event, ok := <-helmWatcher.ResultChan():
case event, ok := <-customW.underlying.ResultChan():
if !ok {
// The watcher has been closed, attempt to re-establish the watch
klog.Warning("HelmRelease watcher closed, attempting to re-establish")
@@ -689,19 +682,19 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
continue // Skip processing this event
}
// Proceed with processing Unstructured objects
matches, err := r.isRelevantHelmRelease(&event)
if err != nil {
klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err)
// Proceed with processing HelmRelease objects
hr, ok := event.Object.(*helmv2.HelmRelease)
if !ok {
klog.V(4).Infof("Expected HelmRelease object, got %T", event.Object)
continue
}
if !matches {
if !r.shouldIncludeHelmRelease(hr) {
continue
}
// Convert HelmRelease to Application
app, err := r.ConvertHelmReleaseToApplication(event.Object.(*unstructured.Unstructured))
app, err := r.ConvertHelmReleaseToApplication(hr)
if err != nil {
klog.Errorf("Error converting HelmRelease to Application: %v", err)
continue
@@ -771,12 +764,16 @@ type customWatcher struct {
resultChan chan watch.Event
stopChan chan struct{}
stopOnce sync.Once
underlying watch.Interface
}
// Stop terminates the watch
func (cw *customWatcher) Stop() {
cw.stopOnce.Do(func() {
close(cw.stopChan)
if cw.underlying != nil {
cw.underlying.Stop()
}
})
}
@@ -785,34 +782,18 @@ func (cw *customWatcher) ResultChan() <-chan watch.Event {
return cw.resultChan
}
// isRelevantHelmRelease checks if the HelmRelease meets the sourceRef and prefix criteria
func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) {
if event.Object == nil {
return false, nil
}
// Check if the object is a *v1.Status
if status, ok := event.Object.(*metav1.Status); ok {
// Log at a less severe level or handle specific status errors if needed
klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
return false, nil // Not relevant for processing as a HelmRelease
}
// Proceed if it's an Unstructured object
hr, ok := event.Object.(*unstructured.Unstructured)
if !ok {
return false, fmt.Errorf("expected Unstructured object, got %T", event.Object)
}
return r.shouldIncludeHelmRelease(hr), nil
}
// shouldIncludeHelmRelease determines if a HelmRelease should be included based on filtering criteria
func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool {
func (r *REST) shouldIncludeHelmRelease(hr *helmv2.HelmRelease) bool {
// Nil check for Chart field
if hr.Spec.Chart == nil {
klog.V(6).Infof("HelmRelease %s has nil spec.chart field", hr.GetName())
return false
}
// Filter by Chart Name
chartName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "chart")
if err != nil || !found {
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field: %v", hr.GetName(), err)
chartName := hr.Spec.Chart.Spec.Chart
if chartName == "" {
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field", hr.GetName())
return false
}
if chartName != r.releaseConfig.Chart.Name {
@@ -825,21 +806,29 @@ func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool {
}
// matchesSourceRefAndPrefix checks both SourceRefConfig and Prefix criteria
func (r *REST) matchesSourceRefAndPrefix(hr *unstructured.Unstructured) bool {
func (r *REST) matchesSourceRefAndPrefix(hr *helmv2.HelmRelease) bool {
// Nil check for Chart field (defensive)
if hr.Spec.Chart == nil {
klog.V(6).Infof("HelmRelease %s has nil spec.chart field", hr.GetName())
return false
}
// Extract SourceRef fields
sourceRefKind, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "kind")
if err != nil || !found {
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field: %v", hr.GetName(), err)
sourceRef := hr.Spec.Chart.Spec.SourceRef
sourceRefKind := sourceRef.Kind
sourceRefName := sourceRef.Name
sourceRefNamespace := sourceRef.Namespace
if sourceRefKind == "" {
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field", hr.GetName())
return false
}
sourceRefName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "name")
if err != nil || !found {
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field: %v", hr.GetName(), err)
if sourceRefName == "" {
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field", hr.GetName())
return false
}
sourceRefNamespace, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "namespace")
if err != nil || !found {
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field: %v", hr.GetName(), err)
if sourceRefNamespace == "" {
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field", hr.GetName())
return false
}
@@ -930,19 +919,11 @@ func filterPrefixedMap(original map[string]string, prefix string) map[string]str
}
// ConvertHelmReleaseToApplication converts a HelmRelease to an Application
func (r *REST) ConvertHelmReleaseToApplication(hr *unstructured.Unstructured) (appsv1alpha1.Application, error) {
func (r *REST) ConvertHelmReleaseToApplication(hr *helmv2.HelmRelease) (appsv1alpha1.Application, error) {
klog.V(6).Infof("Converting HelmRelease to Application for resource %s", hr.GetName())
var helmRelease helmv2.HelmRelease
// Convert unstructured to HelmRelease struct
err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease)
if err != nil {
klog.Errorf("Error converting from unstructured to HelmRelease: %v", err)
return appsv1alpha1.Application{}, err
}
// Convert HelmRelease struct to Application struct
app, err := r.convertHelmReleaseToApplication(&helmRelease)
app, err := r.convertHelmReleaseToApplication(hr)
if err != nil {
klog.Errorf("Error converting from HelmRelease to Application: %v", err)
return appsv1alpha1.Application{}, err

View File

@@ -32,12 +32,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -69,17 +70,19 @@ var helmReleaseGVR = schema.GroupVersionResource{
// REST implements the RESTStorage interface for TenantModule resources
type REST struct {
dynamicClient dynamic.Interface
gvr schema.GroupVersionResource
gvk schema.GroupVersionKind
kindName string
singularName string
c client.Client
w client.WithWatch
gvr schema.GroupVersionResource
gvk schema.GroupVersionKind
kindName string
singularName string
}
// NewREST creates a new REST storage for TenantModule
func NewREST(dynamicClient dynamic.Interface) *REST {
func NewREST(c client.Client, w client.WithWatch) *REST {
return &REST{
dynamicClient: dynamicClient,
c: c,
w: w,
gvr: schema.GroupVersionResource{
Group: corev1alpha1.GroupName,
Version: "v1alpha1",
@@ -115,7 +118,8 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions)
klog.V(6).Infof("Attempting to retrieve TenantModule %s in namespace %s", name, namespace)
// Get the corresponding HelmRelease
hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, name, *options)
hr := &helmv2.HelmRelease{}
err = r.c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, hr, &client.GetOptions{Raw: options})
if err != nil {
klog.Errorf("Error retrieving HelmRelease for TenantModule %s: %v", name, err)
@@ -231,7 +235,11 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption
}
// List HelmReleases with mapped selectors
hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions)
hrList := &helmv2.HelmReleaseList{}
err = r.c.List(ctx, hrList, &client.ListOptions{
Namespace: namespace,
Raw: &metaOptions,
})
if err != nil {
klog.Errorf("Error listing HelmReleases: %v", err)
return nil, err
@@ -241,15 +249,15 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption
items := make([]unstructured.Unstructured, 0)
// Iterate over HelmReleases and convert to TenantModules
for _, hr := range hrList.Items {
for i := range hrList.Items {
// Double-check the label requirement
if !r.hasTenantModuleLabel(&hr) {
if !r.hasTenantModuleLabel(&hrList.Items[i]) {
continue
}
module, err := r.ConvertHelmReleaseToTenantModule(&hr)
module, err := r.ConvertHelmReleaseToTenantModule(&hrList.Items[i])
if err != nil {
klog.Errorf("Error converting HelmRelease %s to TenantModule: %v", hr.GetName(), err)
klog.Errorf("Error converting HelmRelease %s to TenantModule: %v", hrList.Items[i].GetName(), err)
continue
}
@@ -376,7 +384,11 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
}
// Start watch on HelmRelease with mapped selectors
helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions)
hrList := &helmv2.HelmReleaseList{}
helmWatcher, err := r.w.Watch(ctx, hrList, &client.ListOptions{
Namespace: namespace,
Raw: &metaOptions,
})
if err != nil {
klog.Errorf("Error setting up watch for HelmReleases: %v", err)
return nil, err
@@ -386,13 +398,15 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
customW := &customWatcher{
resultChan: make(chan watch.Event),
stopChan: make(chan struct{}),
underlying: helmWatcher,
}
go func() {
defer close(customW.resultChan)
defer customW.underlying.Stop()
for {
select {
case event, ok := <-helmWatcher.ResultChan():
case event, ok := <-customW.underlying.ResultChan():
if !ok {
// The watcher has been closed, attempt to re-establish the watch
klog.Warning("HelmRelease watcher closed, attempting to re-establish")
@@ -406,19 +420,19 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
continue // Skip processing this event
}
// Proceed with processing Unstructured objects
matches, err := r.isRelevantHelmRelease(&event)
if err != nil {
klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err)
// Proceed with processing HelmRelease objects
hr, ok := event.Object.(*helmv2.HelmRelease)
if !ok {
klog.V(4).Infof("Expected HelmRelease object, got %T", event.Object)
continue
}
if !matches {
if !r.hasTenantModuleLabel(hr) {
continue
}
// Convert HelmRelease to TenantModule
module, err := r.ConvertHelmReleaseToTenantModule(event.Object.(*unstructured.Unstructured))
module, err := r.ConvertHelmReleaseToTenantModule(hr)
if err != nil {
klog.Errorf("Error converting HelmRelease to TenantModule: %v", err)
continue
@@ -480,12 +494,16 @@ type customWatcher struct {
resultChan chan watch.Event
stopChan chan struct{}
stopOnce sync.Once
underlying watch.Interface
}
// Stop terminates the watch
func (cw *customWatcher) Stop() {
cw.stopOnce.Do(func() {
close(cw.stopChan)
if cw.underlying != nil {
cw.underlying.Stop()
}
})
}
@@ -494,30 +512,8 @@ func (cw *customWatcher) ResultChan() <-chan watch.Event {
return cw.resultChan
}
// isRelevantHelmRelease checks if the HelmRelease has the tenant module label
func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) {
if event.Object == nil {
return false, nil
}
// Check if the object is a *v1.Status
if status, ok := event.Object.(*metav1.Status); ok {
// Log at a less severe level or handle specific status errors if needed
klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
return false, nil // Not relevant for processing as a HelmRelease
}
// Proceed if it's an Unstructured object
hr, ok := event.Object.(*unstructured.Unstructured)
if !ok {
return false, fmt.Errorf("expected Unstructured object, got %T", event.Object)
}
return r.hasTenantModuleLabel(hr), nil
}
// hasTenantModuleLabel checks if a HelmRelease has the required tenant module label
func (r *REST) hasTenantModuleLabel(hr *unstructured.Unstructured) bool {
func (r *REST) hasTenantModuleLabel(hr *helmv2.HelmRelease) bool {
labels := hr.GetLabels()
if labels == nil {
return false
@@ -554,19 +550,11 @@ func (r *REST) getNamespace(ctx context.Context) (string, error) {
}
// ConvertHelmReleaseToTenantModule converts a HelmRelease to a TenantModule
func (r *REST) ConvertHelmReleaseToTenantModule(hr *unstructured.Unstructured) (corev1alpha1.TenantModule, error) {
func (r *REST) ConvertHelmReleaseToTenantModule(hr *helmv2.HelmRelease) (corev1alpha1.TenantModule, error) {
klog.V(6).Infof("Converting HelmRelease to TenantModule for resource %s", hr.GetName())
var helmRelease helmv2.HelmRelease
// Convert unstructured to HelmRelease struct
err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease)
if err != nil {
klog.Errorf("Error converting from unstructured to HelmRelease: %v", err)
return corev1alpha1.TenantModule{}, err
}
// Convert HelmRelease struct to TenantModule struct
module, err := r.convertHelmReleaseToTenantModule(&helmRelease)
module, err := r.convertHelmReleaseToTenantModule(hr)
if err != nil {
klog.Errorf("Error converting from HelmRelease to TenantModule: %v", err)
return corev1alpha1.TenantModule{}, err

View File

@@ -12,17 +12,18 @@ import (
"time"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metainternal "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
rbacv1client "k8s.io/client-go/kubernetes/typed/rbac/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1"
)
@@ -46,18 +47,18 @@ var (
)
type REST struct {
core corev1client.CoreV1Interface
rbac rbacv1client.RbacV1Interface
gvr schema.GroupVersionResource
c client.Client
w client.WithWatch
gvr schema.GroupVersionResource
}
func NewREST(
coreCli corev1client.CoreV1Interface,
rbacCli rbacv1client.RbacV1Interface,
c client.Client,
w client.WithWatch,
) *REST {
return &REST{
core: coreCli,
rbac: rbacCli,
c: c,
w: w,
gvr: schema.GroupVersionResource{
Group: corev1alpha1.GroupName,
Version: "v1alpha1",
@@ -89,7 +90,8 @@ func (r *REST) List(
ctx context.Context,
_ *metainternal.ListOptions,
) (runtime.Object, error) {
nsList, err := r.core.Namespaces().List(ctx, metav1.ListOptions{})
nsList := &corev1.NamespaceList{}
err := r.c.List(ctx, nsList)
if err != nil {
return nil, err
}
@@ -118,7 +120,8 @@ func (r *REST) Get(
return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
}
ns, err := r.core.Namespaces().Get(ctx, name, *opts)
ns := &corev1.Namespace{}
err := r.c.Get(ctx, types.NamespacedName{Namespace: "", Name: name}, ns, &client.GetOptions{Raw: opts})
if err != nil {
return nil, err
}
@@ -128,14 +131,7 @@ func (r *REST) Get(
APIVersion: corev1alpha1.SchemeGroupVersion.String(),
Kind: "TenantNamespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: ns.Name,
UID: ns.UID,
ResourceVersion: ns.ResourceVersion,
CreationTimestamp: ns.CreationTimestamp,
Labels: ns.Labels,
Annotations: ns.Annotations,
},
ObjectMeta: ns.ObjectMeta,
}, nil
}
@@ -144,10 +140,11 @@ func (r *REST) Get(
// -----------------------------------------------------------------------------
func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch.Interface, error) {
nsWatch, err := r.core.Namespaces().Watch(ctx, metav1.ListOptions{
nsList := &corev1.NamespaceList{}
nsWatch, err := r.w.Watch(ctx, nsList, &client.ListOptions{Raw: &metav1.ListOptions{
Watch: true,
ResourceVersion: opts.ResourceVersion,
})
}})
if err != nil {
return nil, err
}
@@ -282,9 +279,10 @@ func (r *REST) filterAccessible(
for _, name := range names {
nameSet[name] = struct{}{}
}
rbs, err := r.rbac.RoleBindings("").List(ctx, metav1.ListOptions{})
rbs := &rbacv1.RoleBindingList{}
err := r.c.List(ctx, rbs)
if err != nil {
return []string{}, fmt.Errorf("failed to list rolebindings")
return []string{}, fmt.Errorf("failed to list rolebindings: %w", err)
}
allowedNameSet := make(map[string]struct{})
for i := range rbs.Items {

View File

@@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1"
)
@@ -157,13 +157,15 @@ var (
)
type REST struct {
core corev1client.CoreV1Interface
gvr schema.GroupVersionResource
c client.Client
w client.WithWatch
gvr schema.GroupVersionResource
}
func NewREST(coreCli corev1client.CoreV1Interface) *REST {
func NewREST(c client.Client, w client.WithWatch) *REST {
return &REST{
core: coreCli,
c: c,
w: w,
gvr: schema.GroupVersionResource{
Group: corev1alpha1.GroupName,
Version: "v1alpha1",
@@ -203,11 +205,11 @@ func (r *REST) Create(
}
sec := tenantToSecret(in, nil)
out, err := r.core.Secrets(sec.Namespace).Create(ctx, sec, *opts)
err := r.c.Create(ctx, sec, &client.CreateOptions{Raw: opts})
if err != nil {
return nil, err
}
return secretToTenant(out), nil
return secretToTenant(sec), nil
}
func (r *REST) Get(
@@ -219,7 +221,8 @@ func (r *REST) Get(
if err != nil {
return nil, err
}
sec, err := r.core.Secrets(ns).Get(ctx, name, *opts)
sec := &corev1.Secret{}
err = r.c.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, sec, &client.GetOptions{Raw: opts})
if err != nil {
return nil, err
}
@@ -247,10 +250,14 @@ func (r *REST) List(ctx context.Context, opts *metainternal.ListOptions) (runtim
fieldSel = opts.FieldSelector.String()
}
list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{
LabelSelector: ls.String(),
FieldSelector: fieldSel,
})
list := &corev1.SecretList{}
err = r.c.List(ctx, list,
&client.ListOptions{
Namespace: ns,
Raw: &metav1.ListOptions{
LabelSelector: ls.String(),
FieldSelector: fieldSel,
}})
if err != nil {
return nil, err
}
@@ -284,7 +291,8 @@ func (r *REST) Update(
return nil, false, err
}
cur, err := r.core.Secrets(ns).Get(ctx, name, metav1.GetOptions{})
cur := &corev1.Secret{}
err = r.c.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, cur, &client.GetOptions{Raw: &metav1.GetOptions{}})
if err != nil && !apierrors.IsNotFound(err) {
return nil, false, err
}
@@ -296,17 +304,18 @@ func (r *REST) Update(
in := newObj.(*corev1alpha1.TenantSecret)
newSec := tenantToSecret(in, cur)
newSec.Namespace = ns
if cur == nil {
if !forceCreate && err == nil {
return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
}
out, err := r.core.Secrets(ns).Create(ctx, newSec, metav1.CreateOptions{})
return secretToTenant(out), true, err
err := r.c.Create(ctx, newSec, &client.CreateOptions{Raw: &metav1.CreateOptions{}})
return secretToTenant(newSec), true, err
}
newSec.ResourceVersion = cur.ResourceVersion
out, err := r.core.Secrets(ns).Update(ctx, newSec, *opts)
return secretToTenant(out), false, err
err = r.c.Update(ctx, newSec, &client.UpdateOptions{Raw: opts})
return secretToTenant(newSec), false, err
}
func (r *REST) Delete(
@@ -319,7 +328,7 @@ func (r *REST) Delete(
if err != nil {
return nil, false, err
}
err = r.core.Secrets(ns).Delete(ctx, name, *opts)
err = r.c.Delete(ctx, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name}}, &client.DeleteOptions{Raw: opts})
return nil, err == nil, err
}
@@ -331,21 +340,33 @@ func (r *REST) Patch(
opts *metav1.PatchOptions,
subresources ...string,
) (runtime.Object, error) {
if len(subresources) > 0 {
return nil, fmt.Errorf("TenantSecret does not have subresources")
}
ns, err := nsFrom(ctx)
if err != nil {
return nil, err
}
out, err := r.core.Secrets(ns).
Patch(ctx, name, pt, data, *opts, subresources...)
out := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
}
patch := client.RawPatch(pt, data)
err = r.c.Patch(ctx, out, patch, &client.PatchOptions{Raw: opts})
if err != nil {
return nil, err
}
// Ensure tenant secret label is preserved
if out.Labels == nil {
out.Labels = make(map[string]string)
}
if out.Labels[tsLabelKey] != tsLabelValue {
out.Labels[tsLabelKey] = tsLabelValue
out, _ = r.core.Secrets(ns).Update(ctx, out, metav1.UpdateOptions{})
_ = r.c.Update(ctx, out, &client.UpdateOptions{Raw: &metav1.UpdateOptions{}})
}
return secretToTenant(out), nil
@@ -361,12 +382,13 @@ func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch
return nil, err
}
secList := &corev1.SecretList{}
ls := labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String()
base, err := r.core.Secrets(ns).Watch(ctx, metav1.ListOptions{
base, err := r.w.Watch(ctx, secList, &client.ListOptions{Namespace: ns, Raw: &metav1.ListOptions{
Watch: true,
LabelSelector: ls,
ResourceVersion: opts.ResourceVersion,
})
}})
if err != nil {
return nil, err
}

View File

@@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1"
)
@@ -38,13 +38,15 @@ const (
)
type REST struct {
core corev1client.CoreV1Interface
gvr schema.GroupVersionResource
c client.Client
w client.WithWatch
gvr schema.GroupVersionResource
}
func NewREST(coreCli corev1client.CoreV1Interface) *REST {
func NewREST(c client.Client, w client.WithWatch) *REST {
return &REST{
core: coreCli,
c: c,
w: w,
gvr: schema.GroupVersionResource{
Group: corev1alpha1.GroupName,
Version: "v1alpha1",
@@ -95,7 +97,14 @@ func (r *REST) Get(ctx context.Context, name string, opts *metav1.GetOptions) (r
// We need to identify secret name and key. Iterate secrets in namespace with tenant secret label
// and return the matching composed object.
list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String()})
list := &corev1.SecretList{}
err = r.c.List(ctx, list,
&client.ListOptions{
Namespace: ns,
Raw: &metav1.ListOptions{
LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String(),
},
})
if err != nil {
return nil, err
}
@@ -130,7 +139,15 @@ func (r *REST) List(ctx context.Context, opts *metainternal.ListOptions) (runtim
fieldSel = opts.FieldSelector.String()
}
list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{LabelSelector: sel.String(), FieldSelector: fieldSel})
list := &corev1.SecretList{}
err = r.c.List(ctx, list,
&client.ListOptions{
Namespace: ns,
Raw: &metav1.ListOptions{
LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String(),
FieldSelector: fieldSel,
},
})
if err != nil {
return nil, err
}
@@ -169,12 +186,13 @@ func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch
return nil, err
}
secList := &corev1.SecretList{}
ls := labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String()
base, err := r.core.Secrets(ns).Watch(ctx, metav1.ListOptions{
base, err := r.w.Watch(ctx, secList, &client.ListOptions{Namespace: ns, Raw: &metav1.ListOptions{
Watch: true,
LabelSelector: ls,
ResourceVersion: opts.ResourceVersion,
})
}})
if err != nil {
return nil, err
}