mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #98346 from mortent/checkForScalePDBs
Check if resources implement scale in disruption controller
This commit is contained in:
		@@ -101,6 +101,7 @@ func TestController_DiscoveryError(t *testing.T) {
 | 
			
		||||
		"GarbageCollectorController":       startGarbageCollectorController,
 | 
			
		||||
		"EndpointSliceController":          startEndpointSliceController,
 | 
			
		||||
		"EndpointSliceMirroringController": startEndpointSliceMirroringController,
 | 
			
		||||
		"PodDisruptionBudgetController":    startDisruptionController,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tcs := map[string]struct {
 | 
			
		||||
 
 | 
			
		||||
@@ -67,6 +67,7 @@ func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error
 | 
			
		||||
		client,
 | 
			
		||||
		ctx.RESTMapper,
 | 
			
		||||
		scaleClient,
 | 
			
		||||
		client.Discovery(),
 | 
			
		||||
	).Run(ctx.Stop)
 | 
			
		||||
	return nil, true, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,7 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/discovery:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
 | 
			
		||||
@@ -64,6 +65,7 @@ go_test(
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/discovery/fake:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/informers:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/scale/fake:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -19,6 +19,7 @@ package disruption
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	apps "k8s.io/api/apps/v1beta1"
 | 
			
		||||
@@ -34,6 +35,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/intstr"
 | 
			
		||||
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/client-go/discovery"
 | 
			
		||||
	appsv1informers "k8s.io/client-go/informers/apps/v1"
 | 
			
		||||
	coreinformers "k8s.io/client-go/informers/core/v1"
 | 
			
		||||
	policyinformers "k8s.io/client-go/informers/policy/v1beta1"
 | 
			
		||||
@@ -71,6 +73,7 @@ type DisruptionController struct {
 | 
			
		||||
	mapper     apimeta.RESTMapper
 | 
			
		||||
 | 
			
		||||
	scaleNamespacer scaleclient.ScalesGetter
 | 
			
		||||
	discoveryClient discovery.DiscoveryInterface
 | 
			
		||||
 | 
			
		||||
	pdbLister       policylisters.PodDisruptionBudgetLister
 | 
			
		||||
	pdbListerSynced cache.InformerSynced
 | 
			
		||||
@@ -121,6 +124,7 @@ func NewDisruptionController(
 | 
			
		||||
	kubeClient clientset.Interface,
 | 
			
		||||
	restMapper apimeta.RESTMapper,
 | 
			
		||||
	scaleNamespacer scaleclient.ScalesGetter,
 | 
			
		||||
	discoveryClient discovery.DiscoveryInterface,
 | 
			
		||||
) *DisruptionController {
 | 
			
		||||
	dc := &DisruptionController{
 | 
			
		||||
		kubeClient:   kubeClient,
 | 
			
		||||
@@ -164,6 +168,7 @@ func NewDisruptionController(
 | 
			
		||||
 | 
			
		||||
	dc.mapper = restMapper
 | 
			
		||||
	dc.scaleNamespacer = scaleNamespacer
 | 
			
		||||
	dc.discoveryClient = discoveryClient
 | 
			
		||||
 | 
			
		||||
	return dc
 | 
			
		||||
}
 | 
			
		||||
@@ -294,6 +299,16 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe
 | 
			
		||||
	scale, err := dc.scaleNamespacer.Scales(namespace).Get(context.TODO(), gr, controllerRef.Name, metav1.GetOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if errors.IsNotFound(err) {
 | 
			
		||||
			// The IsNotFound error can mean either that the resource does not exist,
 | 
			
		||||
			// or it exist but doesn't implement the scale subresource. We check which
 | 
			
		||||
			// situation we are facing so we can give an appropriate error message.
 | 
			
		||||
			isScale, err := dc.implementsScale(gv, controllerRef.Kind)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			if !isScale {
 | 
			
		||||
				return nil, fmt.Errorf("%s does not implement the scale subresource", gr.String())
 | 
			
		||||
			}
 | 
			
		||||
			return nil, nil
 | 
			
		||||
		}
 | 
			
		||||
		return nil, err
 | 
			
		||||
@@ -304,6 +319,22 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe
 | 
			
		||||
	return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dc *DisruptionController) implementsScale(gv schema.GroupVersion, kind string) (bool, error) {
 | 
			
		||||
	resourceList, err := dc.discoveryClient.ServerResourcesForGroupVersion(gv.String())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return false, err
 | 
			
		||||
	}
 | 
			
		||||
	for _, resource := range resourceList.APIResources {
 | 
			
		||||
		if resource.Kind != kind {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if strings.HasSuffix(resource.Name, "/scale") {
 | 
			
		||||
			return true, nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
 | 
			
		||||
	gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -40,6 +40,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/intstr"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/uuid"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	discoveryfake "k8s.io/client-go/discovery/fake"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	"k8s.io/client-go/kubernetes/fake"
 | 
			
		||||
	scalefake "k8s.io/client-go/scale/fake"
 | 
			
		||||
@@ -105,8 +106,9 @@ type disruptionController struct {
 | 
			
		||||
	dStore   cache.Store
 | 
			
		||||
	ssStore  cache.Store
 | 
			
		||||
 | 
			
		||||
	coreClient  *fake.Clientset
 | 
			
		||||
	scaleClient *scalefake.FakeScaleClient
 | 
			
		||||
	coreClient      *fake.Clientset
 | 
			
		||||
	scaleClient     *scalefake.FakeScaleClient
 | 
			
		||||
	discoveryClient *discoveryfake.FakeDiscovery
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var customGVK = schema.GroupVersionKind{
 | 
			
		||||
@@ -124,6 +126,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
 | 
			
		||||
	scheme := runtime.NewScheme()
 | 
			
		||||
	scheme.AddKnownTypeWithName(customGVK, &v1.Service{})
 | 
			
		||||
	fakeScaleClient := &scalefake.FakeScaleClient{}
 | 
			
		||||
	fakeDiscovery := &discoveryfake.FakeDiscovery{
 | 
			
		||||
		Fake: &core.Fake{},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	dc := NewDisruptionController(
 | 
			
		||||
		informerFactory.Core().V1().Pods(),
 | 
			
		||||
@@ -135,6 +140,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
 | 
			
		||||
		coreClient,
 | 
			
		||||
		testrestmapper.TestOnlyStaticRESTMapper(scheme),
 | 
			
		||||
		fakeScaleClient,
 | 
			
		||||
		fakeDiscovery,
 | 
			
		||||
	)
 | 
			
		||||
	dc.getUpdater = func() updater { return ps.Set }
 | 
			
		||||
	dc.podListerSynced = alwaysReady
 | 
			
		||||
@@ -157,6 +163,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
 | 
			
		||||
		informerFactory.Apps().V1().StatefulSets().Informer().GetStore(),
 | 
			
		||||
		coreClient,
 | 
			
		||||
		fakeScaleClient,
 | 
			
		||||
		fakeDiscovery,
 | 
			
		||||
	}, ps
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -571,6 +578,77 @@ func TestScaleResource(t *testing.T) {
 | 
			
		||||
	ps.VerifyPdbStatus(t, pdbName, disruptionsAllowed, pods, replicas-maxUnavailable, replicas, map[string]metav1.Time{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestScaleFinderNoResource(t *testing.T) {
 | 
			
		||||
	resourceName := "customresources"
 | 
			
		||||
	testCases := map[string]struct {
 | 
			
		||||
		apiResources []metav1.APIResource
 | 
			
		||||
		expectError  bool
 | 
			
		||||
	}{
 | 
			
		||||
		"resource implements scale": {
 | 
			
		||||
			apiResources: []metav1.APIResource{
 | 
			
		||||
				{
 | 
			
		||||
					Kind: customGVK.Kind,
 | 
			
		||||
					Name: resourceName,
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					Kind: customGVK.Kind,
 | 
			
		||||
					Name: resourceName + "/scale",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expectError: false,
 | 
			
		||||
		},
 | 
			
		||||
		"resource does not implement scale": {
 | 
			
		||||
			apiResources: []metav1.APIResource{
 | 
			
		||||
				{
 | 
			
		||||
					Kind: customGVK.Kind,
 | 
			
		||||
					Name: resourceName,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expectError: true,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for tn, tc := range testCases {
 | 
			
		||||
		t.Run(tn, func(t *testing.T) {
 | 
			
		||||
			customResourceUID := uuid.NewUUID()
 | 
			
		||||
 | 
			
		||||
			dc, _ := newFakeDisruptionController()
 | 
			
		||||
 | 
			
		||||
			dc.scaleClient.AddReactor("get", resourceName, func(action core.Action) (handled bool, ret runtime.Object, err error) {
 | 
			
		||||
				gr := schema.GroupResource{
 | 
			
		||||
					Group:    customGVK.Group,
 | 
			
		||||
					Resource: resourceName,
 | 
			
		||||
				}
 | 
			
		||||
				return true, nil, errors.NewNotFound(gr, "name")
 | 
			
		||||
			})
 | 
			
		||||
			dc.discoveryClient.Resources = []*metav1.APIResourceList{
 | 
			
		||||
				{
 | 
			
		||||
					GroupVersion: customGVK.GroupVersion().String(),
 | 
			
		||||
					APIResources: tc.apiResources,
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			trueVal := true
 | 
			
		||||
			ownerRef := &metav1.OwnerReference{
 | 
			
		||||
				Kind:       customGVK.Kind,
 | 
			
		||||
				APIVersion: customGVK.GroupVersion().String(),
 | 
			
		||||
				Controller: &trueVal,
 | 
			
		||||
				UID:        customResourceUID,
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			_, err := dc.getScaleController(ownerRef, "default")
 | 
			
		||||
 | 
			
		||||
			if tc.expectError && err == nil {
 | 
			
		||||
				t.Error("expected error, but didn't get one")
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if !tc.expectError && err != nil {
 | 
			
		||||
				t.Errorf("did not expect error, but got %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Verify that multiple controllers doesn't allow the PDB to be set true.
 | 
			
		||||
func TestMultipleControllers(t *testing.T) {
 | 
			
		||||
	const podCount = 2
 | 
			
		||||
 
 | 
			
		||||
@@ -86,6 +86,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
 | 
			
		||||
		client,
 | 
			
		||||
		mapper,
 | 
			
		||||
		scaleClient,
 | 
			
		||||
		client.Discovery(),
 | 
			
		||||
	)
 | 
			
		||||
	return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -356,6 +356,7 @@ func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.D
 | 
			
		||||
		client,
 | 
			
		||||
		mapper,
 | 
			
		||||
		scaleClient,
 | 
			
		||||
		client.Discovery(),
 | 
			
		||||
	)
 | 
			
		||||
	return s, closeFn, rm, informers, clientSet
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -69,7 +69,8 @@ func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *dis
 | 
			
		||||
		informers.Apps().V1().StatefulSets(),
 | 
			
		||||
		testCtx.ClientSet,
 | 
			
		||||
		mapper,
 | 
			
		||||
		scaleClient)
 | 
			
		||||
		scaleClient,
 | 
			
		||||
		testCtx.ClientSet.Discovery())
 | 
			
		||||
 | 
			
		||||
	informers.Start(testCtx.Scheduler.StopEverything)
 | 
			
		||||
	informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user