mirror of
https://github.com/outbackdingo/cozystack.git
synced 2026-01-27 10:18:39 +00:00
[api] Use shared informer cache
This patch changes all clients in the Cozystack API server to typed ones from the controller runtime. This should improve the performance of the API server and simplifies the code by removing work with unstructured objects and dynamic clients. ```release-note [api] Use typed and cache-backed k8s clients in the Cozystack API to improve performance. Get rid of operations on unstructured objects and use of dynamic clients. ``` Signed-off-by: Timofei Larkin <lllamnyp@gmail.com>
This commit is contained in:
@@ -17,18 +17,22 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
helmv2 "github.com/fluxcd/helm-controller/api/v2"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/cozystack/cozystack/pkg/apis/apps"
|
||||
appsinstall "github.com/cozystack/cozystack/pkg/apis/apps/install"
|
||||
@@ -50,6 +54,7 @@ var (
|
||||
// versions and content types.
|
||||
Codecs = serializer.NewCodecFactory(Scheme)
|
||||
CozyComponentName = "cozy"
|
||||
syncPeriod = 5 * time.Minute
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -58,9 +63,15 @@ func init() {
|
||||
|
||||
// Register HelmRelease types.
|
||||
if err := helmv2.AddToScheme(Scheme); err != nil {
|
||||
panic(fmt.Sprintf("Failed to add HelmRelease types to scheme: %v", err))
|
||||
panic(fmt.Errorf("Failed to add HelmRelease types to scheme: %w", err))
|
||||
}
|
||||
|
||||
if err := corev1.AddToScheme(Scheme); err != nil {
|
||||
panic(fmt.Errorf("Failed to add core types to scheme: %w", err))
|
||||
}
|
||||
if err := rbacv1.AddToScheme(Scheme); err != nil {
|
||||
panic(fmt.Errorf("Failed to add RBAC types to scheme: %w", err))
|
||||
}
|
||||
// Add unversioned types.
|
||||
metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
|
||||
|
||||
@@ -118,43 +129,59 @@ func (c completedConfig) New() (*CozyServer, error) {
|
||||
}
|
||||
|
||||
// Create a dynamic client for HelmRelease using InClusterConfig.
|
||||
inClusterConfig, err := restclient.InClusterConfig()
|
||||
cfg, err := ctrl.GetConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get in-cluster config: %v", err)
|
||||
return nil, fmt.Errorf("failed to get kubeconfig: %w", err)
|
||||
}
|
||||
|
||||
dynamicClient, err := dynamic.NewForConfig(inClusterConfig)
|
||||
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
|
||||
Scheme: Scheme,
|
||||
Cache: cache.Options{SyncPeriod: &syncPeriod},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create dynamic client: %v", err)
|
||||
return nil, fmt.Errorf("failed to build manager: %w", err)
|
||||
}
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(inClusterConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create kube clientset: %v", err)
|
||||
ctx := ctrl.SetupSignalHandler()
|
||||
|
||||
if err = mustGetInformers(ctx, mgr,
|
||||
&helmv2.HelmRelease{},
|
||||
&corev1.Secret{},
|
||||
&corev1.Namespace{},
|
||||
&corev1.Service{},
|
||||
&rbacv1.RoleBinding{},
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("failed to get informers: %w", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := mgr.Start(ctx); err != nil {
|
||||
panic(fmt.Errorf("manager start failed: %w", err))
|
||||
}
|
||||
}()
|
||||
|
||||
if ok := mgr.GetCache().WaitForCacheSync(ctx); !ok {
|
||||
return nil, fmt.Errorf("cache sync failed")
|
||||
}
|
||||
|
||||
cli := mgr.GetClient()
|
||||
watchCli, err := client.NewWithWatch(cfg, client.Options{Scheme: Scheme})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to build watch client: %w", err)
|
||||
}
|
||||
// --- static, cluster-scoped resource for core group ---
|
||||
coreV1alpha1Storage := map[string]rest.Storage{}
|
||||
coreV1alpha1Storage["tenantnamespaces"] = cozyregistry.RESTInPeace(
|
||||
tenantnamespacestorage.NewREST(
|
||||
clientset.CoreV1(),
|
||||
clientset.RbacV1(),
|
||||
),
|
||||
tenantnamespacestorage.NewREST(cli, watchCli),
|
||||
)
|
||||
coreV1alpha1Storage["tenantsecrets"] = cozyregistry.RESTInPeace(
|
||||
tenantsecretstorage.NewREST(
|
||||
clientset.CoreV1(),
|
||||
),
|
||||
tenantsecretstorage.NewREST(cli, watchCli),
|
||||
)
|
||||
coreV1alpha1Storage["tenantsecretstables"] = cozyregistry.RESTInPeace(
|
||||
tenantsecretstablestorage.NewREST(
|
||||
clientset.CoreV1(),
|
||||
),
|
||||
tenantsecretstablestorage.NewREST(cli, watchCli),
|
||||
)
|
||||
coreV1alpha1Storage["tenantmodules"] = cozyregistry.RESTInPeace(
|
||||
tenantmodulestorage.NewREST(
|
||||
dynamicClient,
|
||||
),
|
||||
tenantmodulestorage.NewREST(cli, watchCli),
|
||||
)
|
||||
|
||||
coreApiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(core.GroupName, Scheme, metav1.ParameterCodec, Codecs)
|
||||
@@ -166,7 +193,7 @@ func (c completedConfig) New() (*CozyServer, error) {
|
||||
// --- dynamically-configured, per-tenant resources ---
|
||||
appsV1alpha1Storage := map[string]rest.Storage{}
|
||||
for _, resConfig := range c.ResourceConfig.Resources {
|
||||
storage := applicationstorage.NewREST(dynamicClient, &resConfig)
|
||||
storage := applicationstorage.NewREST(cli, watchCli, &resConfig)
|
||||
appsV1alpha1Storage[resConfig.Application.Plural] = cozyregistry.RESTInPeace(storage)
|
||||
}
|
||||
appsApiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, Scheme, metav1.ParameterCodec, Codecs)
|
||||
@@ -177,3 +204,12 @@ func (c completedConfig) New() (*CozyServer, error) {
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func mustGetInformers(ctx context.Context, mgr ctrl.Manager, types ...client.Object) error {
|
||||
for i := range types {
|
||||
if _, err := mgr.GetCache().GetInformer(ctx, types[i]); err != nil {
|
||||
return fmt.Errorf("failed to get informer for %T: %w", types[i], err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -37,8 +37,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
appsv1alpha1 "github.com/cozystack/cozystack/pkg/apis/apps/v1alpha1"
|
||||
"github.com/cozystack/cozystack/pkg/config"
|
||||
@@ -76,7 +76,8 @@ var helmReleaseGVR = schema.GroupVersionResource{
|
||||
|
||||
// REST implements the RESTStorage interface for Application resources
|
||||
type REST struct {
|
||||
dynamicClient dynamic.Interface
|
||||
c client.Client
|
||||
w client.WithWatch
|
||||
gvr schema.GroupVersionResource
|
||||
gvk schema.GroupVersionKind
|
||||
kindName string
|
||||
@@ -86,7 +87,7 @@ type REST struct {
|
||||
}
|
||||
|
||||
// NewREST creates a new REST storage for Application with specific configuration
|
||||
func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST {
|
||||
func NewREST(c client.Client, w client.WithWatch, config *config.Resource) *REST {
|
||||
var specSchema *structuralschema.Structural
|
||||
|
||||
if raw := strings.TrimSpace(config.Application.OpenAPISchema); raw != "" {
|
||||
@@ -110,7 +111,8 @@ func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST {
|
||||
}
|
||||
|
||||
return &REST{
|
||||
dynamicClient: dynamicClient,
|
||||
c: c,
|
||||
w: w,
|
||||
gvr: schema.GroupVersionResource{
|
||||
Group: appsv1alpha1.GroupName,
|
||||
Version: "v1alpha1",
|
||||
@@ -158,30 +160,23 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation
|
||||
helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
|
||||
// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined
|
||||
|
||||
// Convert HelmRelease to unstructured format
|
||||
unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
|
||||
return nil, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
|
||||
}
|
||||
|
||||
klog.V(6).Infof("Creating HelmRelease %s in namespace %s", helmRelease.Name, app.Namespace)
|
||||
|
||||
// Create HelmRelease in Kubernetes
|
||||
createdHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(app.Namespace).Create(ctx, &unstructured.Unstructured{Object: unstructuredHR}, *options)
|
||||
err = r.c.Create(ctx, helmRelease, &client.CreateOptions{Raw: options})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create HelmRelease %s: %v", helmRelease.Name, err)
|
||||
return nil, fmt.Errorf("failed to create HelmRelease: %v", err)
|
||||
}
|
||||
|
||||
// Convert the created HelmRelease back to Application
|
||||
convertedApp, err := r.ConvertHelmReleaseToApplication(createdHR)
|
||||
convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease)
|
||||
if err != nil {
|
||||
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", createdHR.GetName(), err)
|
||||
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", helmRelease.GetName(), err)
|
||||
return nil, fmt.Errorf("conversion error: %v", err)
|
||||
}
|
||||
|
||||
klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", createdHR.GetName())
|
||||
klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", helmRelease.GetName())
|
||||
|
||||
// Convert Application to unstructured format
|
||||
unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
|
||||
@@ -206,7 +201,8 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions)
|
||||
|
||||
// Get the corresponding HelmRelease using the new prefix
|
||||
helmReleaseName := r.releaseConfig.Prefix + name
|
||||
hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, *options)
|
||||
helmRelease := &helmv2.HelmRelease{}
|
||||
err = r.c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: helmReleaseName}, helmRelease, &client.GetOptions{Raw: options})
|
||||
if err != nil {
|
||||
klog.Errorf("Error retrieving HelmRelease for resource %s: %v", name, err)
|
||||
|
||||
@@ -221,14 +217,14 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions)
|
||||
}
|
||||
|
||||
// Check if HelmRelease meets the required chartName and sourceRef criteria
|
||||
if !r.shouldIncludeHelmRelease(hr) {
|
||||
if !r.shouldIncludeHelmRelease(helmRelease) {
|
||||
klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
|
||||
// Return a NotFound error for the Application resource
|
||||
return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
|
||||
}
|
||||
|
||||
// Convert HelmRelease to Application
|
||||
convertedApp, err := r.ConvertHelmReleaseToApplication(hr)
|
||||
convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease)
|
||||
if err != nil {
|
||||
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", name, err)
|
||||
return nil, fmt.Errorf("conversion error: %v", err)
|
||||
@@ -325,7 +321,11 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption
|
||||
}
|
||||
|
||||
// List HelmReleases with mapped selectors
|
||||
hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions)
|
||||
hrList := &helmv2.HelmReleaseList{}
|
||||
err = r.c.List(ctx, hrList, &client.ListOptions{
|
||||
Namespace: namespace,
|
||||
Raw: &metaOptions,
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Error listing HelmReleases: %v", err)
|
||||
return nil, err
|
||||
@@ -335,14 +335,14 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption
|
||||
items := make([]unstructured.Unstructured, 0)
|
||||
|
||||
// Iterate over HelmReleases and convert to Applications
|
||||
for _, hr := range hrList.Items {
|
||||
if !r.shouldIncludeHelmRelease(&hr) {
|
||||
for i := range hrList.Items {
|
||||
if !r.shouldIncludeHelmRelease(&hrList.Items[i]) {
|
||||
continue
|
||||
}
|
||||
|
||||
app, err := r.ConvertHelmReleaseToApplication(&hr)
|
||||
app, err := r.ConvertHelmReleaseToApplication(&hrList.Items[i])
|
||||
if err != nil {
|
||||
klog.Errorf("Error converting HelmRelease %s to Application: %v", hr.GetName(), err)
|
||||
klog.Errorf("Error converting HelmRelease %s to Application: %v", hrList.Items[i].GetName(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -457,7 +457,8 @@ func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObje
|
||||
|
||||
// Ensure ResourceVersion
|
||||
if helmRelease.ResourceVersion == "" {
|
||||
cur, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Get(ctx, helmRelease.Name, metav1.GetOptions{})
|
||||
cur := &helmv2.HelmRelease{}
|
||||
err := r.c.Get(ctx, client.ObjectKey{Namespace: helmRelease.Namespace, Name: helmRelease.Name}, cur, &client.GetOptions{Raw: &metav1.GetOptions{}})
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("failed to fetch current HelmRelease: %w", err)
|
||||
}
|
||||
@@ -470,53 +471,38 @@ func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObje
|
||||
helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
|
||||
// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined
|
||||
|
||||
// Convert HelmRelease to unstructured format
|
||||
unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
|
||||
return nil, false, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
|
||||
}
|
||||
|
||||
// Retrieve metadata from unstructured object
|
||||
metadata, found, err := unstructured.NestedMap(unstructuredHR, "metadata")
|
||||
if err != nil || !found {
|
||||
klog.Errorf("Failed to retrieve metadata from HelmRelease: %v, found: %v", err, found)
|
||||
return nil, false, fmt.Errorf("failed to retrieve metadata from HelmRelease: %v", err)
|
||||
}
|
||||
klog.V(6).Infof("HelmRelease Metadata: %+v", metadata)
|
||||
|
||||
klog.V(6).Infof("Updating HelmRelease %s in namespace %s", helmRelease.Name, helmRelease.Namespace)
|
||||
|
||||
// Before updating, ensure the HelmRelease meets the inclusion criteria
|
||||
// This prevents updating HelmReleases that should not be managed as Applications
|
||||
if !r.shouldIncludeHelmRelease(&unstructured.Unstructured{Object: unstructuredHR}) {
|
||||
if !r.shouldIncludeHelmRelease(helmRelease) {
|
||||
klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.Name)
|
||||
// Return a NotFound error for the Application resource
|
||||
return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
|
||||
}
|
||||
|
||||
// Update the HelmRelease in Kubernetes
|
||||
resultHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Update(ctx, &unstructured.Unstructured{Object: unstructuredHR}, metav1.UpdateOptions{})
|
||||
err = r.c.Update(ctx, helmRelease, &client.UpdateOptions{Raw: &metav1.UpdateOptions{}})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update HelmRelease %s: %v", helmRelease.Name, err)
|
||||
return nil, false, fmt.Errorf("failed to update HelmRelease: %v", err)
|
||||
}
|
||||
|
||||
// After updating, ensure the updated HelmRelease still meets the inclusion criteria
|
||||
if !r.shouldIncludeHelmRelease(resultHR) {
|
||||
klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", resultHR.GetName())
|
||||
if !r.shouldIncludeHelmRelease(helmRelease) {
|
||||
klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.GetName())
|
||||
// Return a NotFound error for the Application resource
|
||||
return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
|
||||
}
|
||||
|
||||
// Convert the updated HelmRelease back to Application
|
||||
convertedApp, err := r.ConvertHelmReleaseToApplication(resultHR)
|
||||
convertedApp, err := r.ConvertHelmReleaseToApplication(helmRelease)
|
||||
if err != nil {
|
||||
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", resultHR.GetName(), err)
|
||||
klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", helmRelease.GetName(), err)
|
||||
return nil, false, fmt.Errorf("conversion error: %v", err)
|
||||
}
|
||||
|
||||
klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", resultHR.GetName())
|
||||
klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", helmRelease.GetName())
|
||||
|
||||
// Explicitly set apiVersion and kind for Application
|
||||
convertedApp.TypeMeta = metav1.TypeMeta{
|
||||
@@ -554,7 +540,8 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va
|
||||
helmReleaseName := r.releaseConfig.Prefix + name
|
||||
|
||||
// Retrieve the HelmRelease before attempting to delete
|
||||
hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, metav1.GetOptions{})
|
||||
helmRelease := &helmv2.HelmRelease{}
|
||||
err = r.c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: helmReleaseName}, helmRelease, &client.GetOptions{Raw: &metav1.GetOptions{}})
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
// If HelmRelease does not exist, return NotFound error for Application
|
||||
@@ -567,7 +554,7 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va
|
||||
}
|
||||
|
||||
// Validate that the HelmRelease meets the inclusion criteria
|
||||
if !r.shouldIncludeHelmRelease(hr) {
|
||||
if !r.shouldIncludeHelmRelease(helmRelease) {
|
||||
klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
|
||||
// Return NotFound error for Application resource
|
||||
return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
|
||||
@@ -576,7 +563,7 @@ func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.Va
|
||||
klog.V(6).Infof("Deleting HelmRelease %s in namespace %s", helmReleaseName, namespace)
|
||||
|
||||
// Delete the HelmRelease corresponding to the Application
|
||||
err = r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Delete(ctx, helmReleaseName, *options)
|
||||
err = r.c.Delete(ctx, helmRelease, &client.DeleteOptions{Raw: options})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to delete HelmRelease %s: %v", helmReleaseName, err)
|
||||
return nil, false, fmt.Errorf("failed to delete HelmRelease: %v", err)
|
||||
@@ -659,7 +646,11 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
|
||||
}
|
||||
|
||||
// Start watch on HelmRelease with mapped selectors
|
||||
helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions)
|
||||
hrList := &helmv2.HelmReleaseList{}
|
||||
helmWatcher, err := r.w.Watch(ctx, hrList, &client.ListOptions{
|
||||
Namespace: namespace,
|
||||
Raw: &metaOptions,
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Error setting up watch for HelmReleases: %v", err)
|
||||
return nil, err
|
||||
@@ -669,13 +660,15 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
|
||||
customW := &customWatcher{
|
||||
resultChan: make(chan watch.Event),
|
||||
stopChan: make(chan struct{}),
|
||||
underlying: helmWatcher,
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(customW.resultChan)
|
||||
defer customW.underlying.Stop()
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-helmWatcher.ResultChan():
|
||||
case event, ok := <-customW.underlying.ResultChan():
|
||||
if !ok {
|
||||
// The watcher has been closed, attempt to re-establish the watch
|
||||
klog.Warning("HelmRelease watcher closed, attempting to re-establish")
|
||||
@@ -689,19 +682,19 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
|
||||
continue // Skip processing this event
|
||||
}
|
||||
|
||||
// Proceed with processing Unstructured objects
|
||||
matches, err := r.isRelevantHelmRelease(&event)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err)
|
||||
// Proceed with processing HelmRelease objects
|
||||
hr, ok := event.Object.(*helmv2.HelmRelease)
|
||||
if !ok {
|
||||
klog.V(4).Infof("Expected HelmRelease object, got %T", event.Object)
|
||||
continue
|
||||
}
|
||||
|
||||
if !matches {
|
||||
if !r.shouldIncludeHelmRelease(hr) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert HelmRelease to Application
|
||||
app, err := r.ConvertHelmReleaseToApplication(event.Object.(*unstructured.Unstructured))
|
||||
app, err := r.ConvertHelmReleaseToApplication(hr)
|
||||
if err != nil {
|
||||
klog.Errorf("Error converting HelmRelease to Application: %v", err)
|
||||
continue
|
||||
@@ -771,12 +764,16 @@ type customWatcher struct {
|
||||
resultChan chan watch.Event
|
||||
stopChan chan struct{}
|
||||
stopOnce sync.Once
|
||||
underlying watch.Interface
|
||||
}
|
||||
|
||||
// Stop terminates the watch
|
||||
func (cw *customWatcher) Stop() {
|
||||
cw.stopOnce.Do(func() {
|
||||
close(cw.stopChan)
|
||||
if cw.underlying != nil {
|
||||
cw.underlying.Stop()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -785,34 +782,18 @@ func (cw *customWatcher) ResultChan() <-chan watch.Event {
|
||||
return cw.resultChan
|
||||
}
|
||||
|
||||
// isRelevantHelmRelease checks if the HelmRelease meets the sourceRef and prefix criteria
|
||||
func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) {
|
||||
if event.Object == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check if the object is a *v1.Status
|
||||
if status, ok := event.Object.(*metav1.Status); ok {
|
||||
// Log at a less severe level or handle specific status errors if needed
|
||||
klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
|
||||
return false, nil // Not relevant for processing as a HelmRelease
|
||||
}
|
||||
|
||||
// Proceed if it's an Unstructured object
|
||||
hr, ok := event.Object.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("expected Unstructured object, got %T", event.Object)
|
||||
}
|
||||
|
||||
return r.shouldIncludeHelmRelease(hr), nil
|
||||
}
|
||||
|
||||
// shouldIncludeHelmRelease determines if a HelmRelease should be included based on filtering criteria
|
||||
func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool {
|
||||
func (r *REST) shouldIncludeHelmRelease(hr *helmv2.HelmRelease) bool {
|
||||
// Nil check for Chart field
|
||||
if hr.Spec.Chart == nil {
|
||||
klog.V(6).Infof("HelmRelease %s has nil spec.chart field", hr.GetName())
|
||||
return false
|
||||
}
|
||||
|
||||
// Filter by Chart Name
|
||||
chartName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "chart")
|
||||
if err != nil || !found {
|
||||
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field: %v", hr.GetName(), err)
|
||||
chartName := hr.Spec.Chart.Spec.Chart
|
||||
if chartName == "" {
|
||||
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field", hr.GetName())
|
||||
return false
|
||||
}
|
||||
if chartName != r.releaseConfig.Chart.Name {
|
||||
@@ -825,21 +806,29 @@ func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool {
|
||||
}
|
||||
|
||||
// matchesSourceRefAndPrefix checks both SourceRefConfig and Prefix criteria
|
||||
func (r *REST) matchesSourceRefAndPrefix(hr *unstructured.Unstructured) bool {
|
||||
func (r *REST) matchesSourceRefAndPrefix(hr *helmv2.HelmRelease) bool {
|
||||
// Nil check for Chart field (defensive)
|
||||
if hr.Spec.Chart == nil {
|
||||
klog.V(6).Infof("HelmRelease %s has nil spec.chart field", hr.GetName())
|
||||
return false
|
||||
}
|
||||
|
||||
// Extract SourceRef fields
|
||||
sourceRefKind, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "kind")
|
||||
if err != nil || !found {
|
||||
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field: %v", hr.GetName(), err)
|
||||
sourceRef := hr.Spec.Chart.Spec.SourceRef
|
||||
sourceRefKind := sourceRef.Kind
|
||||
sourceRefName := sourceRef.Name
|
||||
sourceRefNamespace := sourceRef.Namespace
|
||||
|
||||
if sourceRefKind == "" {
|
||||
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field", hr.GetName())
|
||||
return false
|
||||
}
|
||||
sourceRefName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "name")
|
||||
if err != nil || !found {
|
||||
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field: %v", hr.GetName(), err)
|
||||
if sourceRefName == "" {
|
||||
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field", hr.GetName())
|
||||
return false
|
||||
}
|
||||
sourceRefNamespace, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "namespace")
|
||||
if err != nil || !found {
|
||||
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field: %v", hr.GetName(), err)
|
||||
if sourceRefNamespace == "" {
|
||||
klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field", hr.GetName())
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -930,19 +919,11 @@ func filterPrefixedMap(original map[string]string, prefix string) map[string]str
|
||||
}
|
||||
|
||||
// ConvertHelmReleaseToApplication converts a HelmRelease to an Application
|
||||
func (r *REST) ConvertHelmReleaseToApplication(hr *unstructured.Unstructured) (appsv1alpha1.Application, error) {
|
||||
func (r *REST) ConvertHelmReleaseToApplication(hr *helmv2.HelmRelease) (appsv1alpha1.Application, error) {
|
||||
klog.V(6).Infof("Converting HelmRelease to Application for resource %s", hr.GetName())
|
||||
|
||||
var helmRelease helmv2.HelmRelease
|
||||
// Convert unstructured to HelmRelease struct
|
||||
err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease)
|
||||
if err != nil {
|
||||
klog.Errorf("Error converting from unstructured to HelmRelease: %v", err)
|
||||
return appsv1alpha1.Application{}, err
|
||||
}
|
||||
|
||||
// Convert HelmRelease struct to Application struct
|
||||
app, err := r.convertHelmReleaseToApplication(&helmRelease)
|
||||
app, err := r.convertHelmReleaseToApplication(hr)
|
||||
if err != nil {
|
||||
klog.Errorf("Error converting from HelmRelease to Application: %v", err)
|
||||
return appsv1alpha1.Application{}, err
|
||||
|
||||
@@ -32,12 +32,13 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/selection"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/duration"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -69,17 +70,19 @@ var helmReleaseGVR = schema.GroupVersionResource{
|
||||
|
||||
// REST implements the RESTStorage interface for TenantModule resources
|
||||
type REST struct {
|
||||
dynamicClient dynamic.Interface
|
||||
gvr schema.GroupVersionResource
|
||||
gvk schema.GroupVersionKind
|
||||
kindName string
|
||||
singularName string
|
||||
c client.Client
|
||||
w client.WithWatch
|
||||
gvr schema.GroupVersionResource
|
||||
gvk schema.GroupVersionKind
|
||||
kindName string
|
||||
singularName string
|
||||
}
|
||||
|
||||
// NewREST creates a new REST storage for TenantModule
|
||||
func NewREST(dynamicClient dynamic.Interface) *REST {
|
||||
func NewREST(c client.Client, w client.WithWatch) *REST {
|
||||
return &REST{
|
||||
dynamicClient: dynamicClient,
|
||||
c: c,
|
||||
w: w,
|
||||
gvr: schema.GroupVersionResource{
|
||||
Group: corev1alpha1.GroupName,
|
||||
Version: "v1alpha1",
|
||||
@@ -115,7 +118,8 @@ func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions)
|
||||
klog.V(6).Infof("Attempting to retrieve TenantModule %s in namespace %s", name, namespace)
|
||||
|
||||
// Get the corresponding HelmRelease
|
||||
hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, name, *options)
|
||||
hr := &helmv2.HelmRelease{}
|
||||
err = r.c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, hr, &client.GetOptions{Raw: options})
|
||||
if err != nil {
|
||||
klog.Errorf("Error retrieving HelmRelease for TenantModule %s: %v", name, err)
|
||||
|
||||
@@ -231,7 +235,11 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption
|
||||
}
|
||||
|
||||
// List HelmReleases with mapped selectors
|
||||
hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions)
|
||||
hrList := &helmv2.HelmReleaseList{}
|
||||
err = r.c.List(ctx, hrList, &client.ListOptions{
|
||||
Namespace: namespace,
|
||||
Raw: &metaOptions,
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Error listing HelmReleases: %v", err)
|
||||
return nil, err
|
||||
@@ -241,15 +249,15 @@ func (r *REST) List(ctx context.Context, options *metainternalversion.ListOption
|
||||
items := make([]unstructured.Unstructured, 0)
|
||||
|
||||
// Iterate over HelmReleases and convert to TenantModules
|
||||
for _, hr := range hrList.Items {
|
||||
for i := range hrList.Items {
|
||||
// Double-check the label requirement
|
||||
if !r.hasTenantModuleLabel(&hr) {
|
||||
if !r.hasTenantModuleLabel(&hrList.Items[i]) {
|
||||
continue
|
||||
}
|
||||
|
||||
module, err := r.ConvertHelmReleaseToTenantModule(&hr)
|
||||
module, err := r.ConvertHelmReleaseToTenantModule(&hrList.Items[i])
|
||||
if err != nil {
|
||||
klog.Errorf("Error converting HelmRelease %s to TenantModule: %v", hr.GetName(), err)
|
||||
klog.Errorf("Error converting HelmRelease %s to TenantModule: %v", hrList.Items[i].GetName(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -376,7 +384,11 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
|
||||
}
|
||||
|
||||
// Start watch on HelmRelease with mapped selectors
|
||||
helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions)
|
||||
hrList := &helmv2.HelmReleaseList{}
|
||||
helmWatcher, err := r.w.Watch(ctx, hrList, &client.ListOptions{
|
||||
Namespace: namespace,
|
||||
Raw: &metaOptions,
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Error setting up watch for HelmReleases: %v", err)
|
||||
return nil, err
|
||||
@@ -386,13 +398,15 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
|
||||
customW := &customWatcher{
|
||||
resultChan: make(chan watch.Event),
|
||||
stopChan: make(chan struct{}),
|
||||
underlying: helmWatcher,
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(customW.resultChan)
|
||||
defer customW.underlying.Stop()
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-helmWatcher.ResultChan():
|
||||
case event, ok := <-customW.underlying.ResultChan():
|
||||
if !ok {
|
||||
// The watcher has been closed, attempt to re-establish the watch
|
||||
klog.Warning("HelmRelease watcher closed, attempting to re-establish")
|
||||
@@ -406,19 +420,19 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
|
||||
continue // Skip processing this event
|
||||
}
|
||||
|
||||
// Proceed with processing Unstructured objects
|
||||
matches, err := r.isRelevantHelmRelease(&event)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err)
|
||||
// Proceed with processing HelmRelease objects
|
||||
hr, ok := event.Object.(*helmv2.HelmRelease)
|
||||
if !ok {
|
||||
klog.V(4).Infof("Expected HelmRelease object, got %T", event.Object)
|
||||
continue
|
||||
}
|
||||
|
||||
if !matches {
|
||||
if !r.hasTenantModuleLabel(hr) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert HelmRelease to TenantModule
|
||||
module, err := r.ConvertHelmReleaseToTenantModule(event.Object.(*unstructured.Unstructured))
|
||||
module, err := r.ConvertHelmReleaseToTenantModule(hr)
|
||||
if err != nil {
|
||||
klog.Errorf("Error converting HelmRelease to TenantModule: %v", err)
|
||||
continue
|
||||
@@ -480,12 +494,16 @@ type customWatcher struct {
|
||||
resultChan chan watch.Event
|
||||
stopChan chan struct{}
|
||||
stopOnce sync.Once
|
||||
underlying watch.Interface
|
||||
}
|
||||
|
||||
// Stop terminates the watch
|
||||
func (cw *customWatcher) Stop() {
|
||||
cw.stopOnce.Do(func() {
|
||||
close(cw.stopChan)
|
||||
if cw.underlying != nil {
|
||||
cw.underlying.Stop()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -494,30 +512,8 @@ func (cw *customWatcher) ResultChan() <-chan watch.Event {
|
||||
return cw.resultChan
|
||||
}
|
||||
|
||||
// isRelevantHelmRelease checks if the HelmRelease has the tenant module label
|
||||
func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) {
|
||||
if event.Object == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check if the object is a *v1.Status
|
||||
if status, ok := event.Object.(*metav1.Status); ok {
|
||||
// Log at a less severe level or handle specific status errors if needed
|
||||
klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
|
||||
return false, nil // Not relevant for processing as a HelmRelease
|
||||
}
|
||||
|
||||
// Proceed if it's an Unstructured object
|
||||
hr, ok := event.Object.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("expected Unstructured object, got %T", event.Object)
|
||||
}
|
||||
|
||||
return r.hasTenantModuleLabel(hr), nil
|
||||
}
|
||||
|
||||
// hasTenantModuleLabel checks if a HelmRelease has the required tenant module label
|
||||
func (r *REST) hasTenantModuleLabel(hr *unstructured.Unstructured) bool {
|
||||
func (r *REST) hasTenantModuleLabel(hr *helmv2.HelmRelease) bool {
|
||||
labels := hr.GetLabels()
|
||||
if labels == nil {
|
||||
return false
|
||||
@@ -554,19 +550,11 @@ func (r *REST) getNamespace(ctx context.Context) (string, error) {
|
||||
}
|
||||
|
||||
// ConvertHelmReleaseToTenantModule converts a HelmRelease to a TenantModule
|
||||
func (r *REST) ConvertHelmReleaseToTenantModule(hr *unstructured.Unstructured) (corev1alpha1.TenantModule, error) {
|
||||
func (r *REST) ConvertHelmReleaseToTenantModule(hr *helmv2.HelmRelease) (corev1alpha1.TenantModule, error) {
|
||||
klog.V(6).Infof("Converting HelmRelease to TenantModule for resource %s", hr.GetName())
|
||||
|
||||
var helmRelease helmv2.HelmRelease
|
||||
// Convert unstructured to HelmRelease struct
|
||||
err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease)
|
||||
if err != nil {
|
||||
klog.Errorf("Error converting from unstructured to HelmRelease: %v", err)
|
||||
return corev1alpha1.TenantModule{}, err
|
||||
}
|
||||
|
||||
// Convert HelmRelease struct to TenantModule struct
|
||||
module, err := r.convertHelmReleaseToTenantModule(&helmRelease)
|
||||
module, err := r.convertHelmReleaseToTenantModule(hr)
|
||||
if err != nil {
|
||||
klog.Errorf("Error converting from HelmRelease to TenantModule: %v", err)
|
||||
return corev1alpha1.TenantModule{}, err
|
||||
|
||||
@@ -12,17 +12,18 @@ import (
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metainternal "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/duration"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
rbacv1client "k8s.io/client-go/kubernetes/typed/rbac/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1"
|
||||
)
|
||||
@@ -46,18 +47,18 @@ var (
|
||||
)
|
||||
|
||||
type REST struct {
|
||||
core corev1client.CoreV1Interface
|
||||
rbac rbacv1client.RbacV1Interface
|
||||
gvr schema.GroupVersionResource
|
||||
c client.Client
|
||||
w client.WithWatch
|
||||
gvr schema.GroupVersionResource
|
||||
}
|
||||
|
||||
func NewREST(
|
||||
coreCli corev1client.CoreV1Interface,
|
||||
rbacCli rbacv1client.RbacV1Interface,
|
||||
c client.Client,
|
||||
w client.WithWatch,
|
||||
) *REST {
|
||||
return &REST{
|
||||
core: coreCli,
|
||||
rbac: rbacCli,
|
||||
c: c,
|
||||
w: w,
|
||||
gvr: schema.GroupVersionResource{
|
||||
Group: corev1alpha1.GroupName,
|
||||
Version: "v1alpha1",
|
||||
@@ -89,7 +90,8 @@ func (r *REST) List(
|
||||
ctx context.Context,
|
||||
_ *metainternal.ListOptions,
|
||||
) (runtime.Object, error) {
|
||||
nsList, err := r.core.Namespaces().List(ctx, metav1.ListOptions{})
|
||||
nsList := &corev1.NamespaceList{}
|
||||
err := r.c.List(ctx, nsList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -118,7 +120,8 @@ func (r *REST) Get(
|
||||
return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
|
||||
}
|
||||
|
||||
ns, err := r.core.Namespaces().Get(ctx, name, *opts)
|
||||
ns := &corev1.Namespace{}
|
||||
err := r.c.Get(ctx, types.NamespacedName{Namespace: "", Name: name}, ns, &client.GetOptions{Raw: opts})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -128,14 +131,7 @@ func (r *REST) Get(
|
||||
APIVersion: corev1alpha1.SchemeGroupVersion.String(),
|
||||
Kind: "TenantNamespace",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: ns.Name,
|
||||
UID: ns.UID,
|
||||
ResourceVersion: ns.ResourceVersion,
|
||||
CreationTimestamp: ns.CreationTimestamp,
|
||||
Labels: ns.Labels,
|
||||
Annotations: ns.Annotations,
|
||||
},
|
||||
ObjectMeta: ns.ObjectMeta,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -144,10 +140,11 @@ func (r *REST) Get(
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch.Interface, error) {
|
||||
nsWatch, err := r.core.Namespaces().Watch(ctx, metav1.ListOptions{
|
||||
nsList := &corev1.NamespaceList{}
|
||||
nsWatch, err := r.w.Watch(ctx, nsList, &client.ListOptions{Raw: &metav1.ListOptions{
|
||||
Watch: true,
|
||||
ResourceVersion: opts.ResourceVersion,
|
||||
})
|
||||
}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -282,9 +279,10 @@ func (r *REST) filterAccessible(
|
||||
for _, name := range names {
|
||||
nameSet[name] = struct{}{}
|
||||
}
|
||||
rbs, err := r.rbac.RoleBindings("").List(ctx, metav1.ListOptions{})
|
||||
rbs := &rbacv1.RoleBindingList{}
|
||||
err := r.c.List(ctx, rbs)
|
||||
if err != nil {
|
||||
return []string{}, fmt.Errorf("failed to list rolebindings")
|
||||
return []string{}, fmt.Errorf("failed to list rolebindings: %w", err)
|
||||
}
|
||||
allowedNameSet := make(map[string]struct{})
|
||||
for i := range rbs.Items {
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1"
|
||||
)
|
||||
@@ -157,13 +157,15 @@ var (
|
||||
)
|
||||
|
||||
type REST struct {
|
||||
core corev1client.CoreV1Interface
|
||||
gvr schema.GroupVersionResource
|
||||
c client.Client
|
||||
w client.WithWatch
|
||||
gvr schema.GroupVersionResource
|
||||
}
|
||||
|
||||
func NewREST(coreCli corev1client.CoreV1Interface) *REST {
|
||||
func NewREST(c client.Client, w client.WithWatch) *REST {
|
||||
return &REST{
|
||||
core: coreCli,
|
||||
c: c,
|
||||
w: w,
|
||||
gvr: schema.GroupVersionResource{
|
||||
Group: corev1alpha1.GroupName,
|
||||
Version: "v1alpha1",
|
||||
@@ -203,11 +205,11 @@ func (r *REST) Create(
|
||||
}
|
||||
|
||||
sec := tenantToSecret(in, nil)
|
||||
out, err := r.core.Secrets(sec.Namespace).Create(ctx, sec, *opts)
|
||||
err := r.c.Create(ctx, sec, &client.CreateOptions{Raw: opts})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return secretToTenant(out), nil
|
||||
return secretToTenant(sec), nil
|
||||
}
|
||||
|
||||
func (r *REST) Get(
|
||||
@@ -219,7 +221,8 @@ func (r *REST) Get(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sec, err := r.core.Secrets(ns).Get(ctx, name, *opts)
|
||||
sec := &corev1.Secret{}
|
||||
err = r.c.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, sec, &client.GetOptions{Raw: opts})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -247,10 +250,14 @@ func (r *REST) List(ctx context.Context, opts *metainternal.ListOptions) (runtim
|
||||
fieldSel = opts.FieldSelector.String()
|
||||
}
|
||||
|
||||
list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
FieldSelector: fieldSel,
|
||||
})
|
||||
list := &corev1.SecretList{}
|
||||
err = r.c.List(ctx, list,
|
||||
&client.ListOptions{
|
||||
Namespace: ns,
|
||||
Raw: &metav1.ListOptions{
|
||||
LabelSelector: ls.String(),
|
||||
FieldSelector: fieldSel,
|
||||
}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -284,7 +291,8 @@ func (r *REST) Update(
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
cur, err := r.core.Secrets(ns).Get(ctx, name, metav1.GetOptions{})
|
||||
cur := &corev1.Secret{}
|
||||
err = r.c.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, cur, &client.GetOptions{Raw: &metav1.GetOptions{}})
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return nil, false, err
|
||||
}
|
||||
@@ -296,17 +304,18 @@ func (r *REST) Update(
|
||||
in := newObj.(*corev1alpha1.TenantSecret)
|
||||
|
||||
newSec := tenantToSecret(in, cur)
|
||||
newSec.Namespace = ns
|
||||
if cur == nil {
|
||||
if !forceCreate && err == nil {
|
||||
return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
|
||||
}
|
||||
out, err := r.core.Secrets(ns).Create(ctx, newSec, metav1.CreateOptions{})
|
||||
return secretToTenant(out), true, err
|
||||
err := r.c.Create(ctx, newSec, &client.CreateOptions{Raw: &metav1.CreateOptions{}})
|
||||
return secretToTenant(newSec), true, err
|
||||
}
|
||||
|
||||
newSec.ResourceVersion = cur.ResourceVersion
|
||||
out, err := r.core.Secrets(ns).Update(ctx, newSec, *opts)
|
||||
return secretToTenant(out), false, err
|
||||
err = r.c.Update(ctx, newSec, &client.UpdateOptions{Raw: opts})
|
||||
return secretToTenant(newSec), false, err
|
||||
}
|
||||
|
||||
func (r *REST) Delete(
|
||||
@@ -319,7 +328,7 @@ func (r *REST) Delete(
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
err = r.core.Secrets(ns).Delete(ctx, name, *opts)
|
||||
err = r.c.Delete(ctx, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: name}}, &client.DeleteOptions{Raw: opts})
|
||||
return nil, err == nil, err
|
||||
}
|
||||
|
||||
@@ -331,21 +340,33 @@ func (r *REST) Patch(
|
||||
opts *metav1.PatchOptions,
|
||||
subresources ...string,
|
||||
) (runtime.Object, error) {
|
||||
if len(subresources) > 0 {
|
||||
return nil, fmt.Errorf("TenantSecret does not have subresources")
|
||||
}
|
||||
ns, err := nsFrom(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out, err := r.core.Secrets(ns).
|
||||
Patch(ctx, name, pt, data, *opts, subresources...)
|
||||
out := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: ns,
|
||||
Name: name,
|
||||
},
|
||||
}
|
||||
patch := client.RawPatch(pt, data)
|
||||
err = r.c.Patch(ctx, out, patch, &client.PatchOptions{Raw: opts})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ensure tenant secret label is preserved
|
||||
if out.Labels == nil {
|
||||
out.Labels = make(map[string]string)
|
||||
}
|
||||
|
||||
if out.Labels[tsLabelKey] != tsLabelValue {
|
||||
out.Labels[tsLabelKey] = tsLabelValue
|
||||
out, _ = r.core.Secrets(ns).Update(ctx, out, metav1.UpdateOptions{})
|
||||
_ = r.c.Update(ctx, out, &client.UpdateOptions{Raw: &metav1.UpdateOptions{}})
|
||||
}
|
||||
|
||||
return secretToTenant(out), nil
|
||||
@@ -361,12 +382,13 @@ func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch
|
||||
return nil, err
|
||||
}
|
||||
|
||||
secList := &corev1.SecretList{}
|
||||
ls := labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String()
|
||||
base, err := r.core.Secrets(ns).Watch(ctx, metav1.ListOptions{
|
||||
base, err := r.w.Watch(ctx, secList, &client.ListOptions{Namespace: ns, Raw: &metav1.ListOptions{
|
||||
Watch: true,
|
||||
LabelSelector: ls,
|
||||
ResourceVersion: opts.ResourceVersion,
|
||||
})
|
||||
}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
corev1alpha1 "github.com/cozystack/cozystack/pkg/apis/core/v1alpha1"
|
||||
)
|
||||
@@ -38,13 +38,15 @@ const (
|
||||
)
|
||||
|
||||
type REST struct {
|
||||
core corev1client.CoreV1Interface
|
||||
gvr schema.GroupVersionResource
|
||||
c client.Client
|
||||
w client.WithWatch
|
||||
gvr schema.GroupVersionResource
|
||||
}
|
||||
|
||||
func NewREST(coreCli corev1client.CoreV1Interface) *REST {
|
||||
func NewREST(c client.Client, w client.WithWatch) *REST {
|
||||
return &REST{
|
||||
core: coreCli,
|
||||
c: c,
|
||||
w: w,
|
||||
gvr: schema.GroupVersionResource{
|
||||
Group: corev1alpha1.GroupName,
|
||||
Version: "v1alpha1",
|
||||
@@ -95,7 +97,14 @@ func (r *REST) Get(ctx context.Context, name string, opts *metav1.GetOptions) (r
|
||||
|
||||
// We need to identify secret name and key. Iterate secrets in namespace with tenant secret label
|
||||
// and return the matching composed object.
|
||||
list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String()})
|
||||
list := &corev1.SecretList{}
|
||||
err = r.c.List(ctx, list,
|
||||
&client.ListOptions{
|
||||
Namespace: ns,
|
||||
Raw: &metav1.ListOptions{
|
||||
LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String(),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -130,7 +139,15 @@ func (r *REST) List(ctx context.Context, opts *metainternal.ListOptions) (runtim
|
||||
fieldSel = opts.FieldSelector.String()
|
||||
}
|
||||
|
||||
list, err := r.core.Secrets(ns).List(ctx, metav1.ListOptions{LabelSelector: sel.String(), FieldSelector: fieldSel})
|
||||
list := &corev1.SecretList{}
|
||||
err = r.c.List(ctx, list,
|
||||
&client.ListOptions{
|
||||
Namespace: ns,
|
||||
Raw: &metav1.ListOptions{
|
||||
LabelSelector: labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String(),
|
||||
FieldSelector: fieldSel,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -169,12 +186,13 @@ func (r *REST) Watch(ctx context.Context, opts *metainternal.ListOptions) (watch
|
||||
return nil, err
|
||||
}
|
||||
|
||||
secList := &corev1.SecretList{}
|
||||
ls := labels.Set{tsLabelKey: tsLabelValue}.AsSelector().String()
|
||||
base, err := r.core.Secrets(ns).Watch(ctx, metav1.ListOptions{
|
||||
base, err := r.w.Watch(ctx, secList, &client.ListOptions{Namespace: ns, Raw: &metav1.ListOptions{
|
||||
Watch: true,
|
||||
LabelSelector: ls,
|
||||
ResourceVersion: opts.ResourceVersion,
|
||||
})
|
||||
}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user