mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-02 11:18:16 +00:00
Allow garbage collection to work against different API prefixes
The GC needs to build clients based only on Resource or Kind. Hoist the restmapper out of the controller and the clientpool, support a new ClientForGroupVersionKind and ClientForGroupVersionResource, and use the appropriate one in both places.
This commit is contained in:
@@ -29,7 +29,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/client/typed/dynamic"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
@@ -478,7 +477,7 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource
|
||||
// TODO: consider store in one storage.
|
||||
glog.V(6).Infof("create storage for resource %s", resource)
|
||||
var monitor monitor
|
||||
client, err := gc.metaOnlyClientPool.ClientForGroupVersion(resource.GroupVersion())
|
||||
client, err := gc.metaOnlyClientPool.ClientForGroupVersionKind(kind)
|
||||
if err != nil {
|
||||
return monitor, err
|
||||
}
|
||||
@@ -537,12 +536,11 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
|
||||
unversioned.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {},
|
||||
}
|
||||
|
||||
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
|
||||
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
|
||||
gc := &GarbageCollector{
|
||||
metaOnlyClientPool: metaOnlyClientPool,
|
||||
clientPool: clientPool,
|
||||
// TODO: should use a dynamic RESTMapper built from the discovery results.
|
||||
restMapper: registered.RESTMapper(),
|
||||
metaOnlyClientPool: metaOnlyClientPool,
|
||||
clientPool: clientPool,
|
||||
restMapper: mapper,
|
||||
clock: clock.RealClock{},
|
||||
dirtyQueue: workqueue.NewTimedWorkQueue(),
|
||||
orphanQueue: workqueue.NewTimedWorkQueue(),
|
||||
@@ -611,7 +609,7 @@ func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool
|
||||
|
||||
func (gc *GarbageCollector) deleteObject(item objectReference) error {
|
||||
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
|
||||
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
|
||||
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
|
||||
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
|
||||
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
|
||||
if err != nil {
|
||||
@@ -625,7 +623,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference) error {
|
||||
|
||||
func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) {
|
||||
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
|
||||
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
|
||||
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
|
||||
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
|
||||
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
|
||||
if err != nil {
|
||||
@@ -636,7 +634,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructur
|
||||
|
||||
func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) {
|
||||
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
|
||||
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
|
||||
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
|
||||
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
|
||||
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
|
||||
if err != nil {
|
||||
@@ -647,7 +645,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unst
|
||||
|
||||
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) {
|
||||
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
|
||||
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
|
||||
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
|
||||
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
|
||||
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
|
||||
if err != nil {
|
||||
@@ -728,7 +726,7 @@ func (gc *GarbageCollector) processItem(item *node) error {
|
||||
// prevent objects having references to an old resource from being
|
||||
// deleted during a cluster upgrade.
|
||||
fqKind := unversioned.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
|
||||
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
|
||||
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -23,17 +23,19 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
_ "k8s.io/kubernetes/pkg/api/install"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
_ "k8s.io/kubernetes/pkg/api/install"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/typed/dynamic"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
"k8s.io/kubernetes/pkg/util/json"
|
||||
@@ -44,11 +46,11 @@ import (
|
||||
func TestNewGarbageCollector(t *testing.T) {
|
||||
config := &restclient.Config{}
|
||||
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig.NegotiatedSerializer = nil
|
||||
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
|
||||
clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, registered.RESTMapper(), podResource)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -107,11 +109,11 @@ func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request))
|
||||
|
||||
func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector {
|
||||
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
|
||||
metaOnlyClientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
config.ContentConfig.NegotiatedSerializer = nil
|
||||
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
|
||||
clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
|
||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, registered.RESTMapper(), podResource)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -340,9 +342,9 @@ func TestGCListWatcher(t *testing.T) {
|
||||
testHandler := &fakeActionHandler{}
|
||||
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||
defer srv.Close()
|
||||
clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
|
||||
clientPool := dynamic.NewClientPool(clientConfig, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
podResource := unversioned.GroupVersionResource{Version: "v1", Resource: "pods"}
|
||||
client, err := clientPool.ClientForGroupVersion(podResource.GroupVersion())
|
||||
client, err := clientPool.ClientForGroupVersionResource(podResource)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
@@ -113,7 +114,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
|
||||
groupVersionResources := testGroupVersionResources()
|
||||
for _, groupVersionResource := range groupVersionResources {
|
||||
urlPath := path.Join([]string{
|
||||
dynamic.LegacyAPIPathResolverFunc(groupVersionResource.GroupVersion()),
|
||||
dynamic.LegacyAPIPathResolverFunc(unversioned.GroupVersionKind{Group: groupVersionResource.Group, Version: groupVersionResource.Version}),
|
||||
groupVersionResource.Group,
|
||||
groupVersionResource.Version,
|
||||
"namespaces",
|
||||
@@ -155,7 +156,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
|
||||
defer srv.Close()
|
||||
|
||||
mockClient := fake.NewSimpleClientset(testInput.testNamespace)
|
||||
clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
|
||||
clientPool := dynamic.NewClientPool(clientConfig, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
|
||||
|
||||
err := syncNamespace(mockClient, clientPool, operationNotSupportedCache{}, groupVersionResources, testInput.testNamespace, api.FinalizerKubernetes)
|
||||
if err != nil {
|
||||
|
||||
@@ -274,7 +274,7 @@ func deleteAllContentForGroupVersionResource(
|
||||
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)
|
||||
|
||||
// get a client for this group version...
|
||||
dynamicClient, err := clientPool.ClientForGroupVersion(gvr.GroupVersion())
|
||||
dynamicClient, err := clientPool.ClientForGroupVersionResource(gvr)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
|
||||
return estimate, err
|
||||
|
||||
Reference in New Issue
Block a user