mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Integration tests for KEP Pod Scheduling Readiness
- test generic integration in plugins_test.go - test integration with SchedulingGates plugin in queue_test.go
This commit is contained in:
		@@ -31,9 +31,12 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	listersv1 "k8s.io/client-go/listers/core/v1"
 | 
			
		||||
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
			
		||||
	configv1 "k8s.io/kube-scheduler/config/v1"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler"
 | 
			
		||||
	schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
 | 
			
		||||
	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
 | 
			
		||||
@@ -57,9 +60,15 @@ var (
 | 
			
		||||
	podSchedulingError              = testutils.PodSchedulingError
 | 
			
		||||
	createAndWaitForNodesInCache    = testutils.CreateAndWaitForNodesInCache
 | 
			
		||||
	waitForPodUnschedulable         = testutils.WaitForPodUnschedulable
 | 
			
		||||
	waitForPodSchedulingGated       = testutils.WaitForPodSchedulingGated
 | 
			
		||||
	waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type PreEnqueuePlugin struct {
 | 
			
		||||
	called int32
 | 
			
		||||
	admit  bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type PreFilterPlugin struct {
 | 
			
		||||
	numPreFilterCalled int
 | 
			
		||||
	failPreFilter      bool
 | 
			
		||||
@@ -146,6 +155,7 @@ type PermitPlugin struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	enqueuePluginName            = "enqueue-plugin"
 | 
			
		||||
	prefilterPluginName          = "prefilter-plugin"
 | 
			
		||||
	postfilterPluginName         = "postfilter-plugin"
 | 
			
		||||
	scorePluginName              = "score-plugin"
 | 
			
		||||
@@ -158,6 +168,7 @@ const (
 | 
			
		||||
	permitPluginName             = "permit-plugin"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var _ framework.PreEnqueuePlugin = &PreEnqueuePlugin{}
 | 
			
		||||
var _ framework.PreFilterPlugin = &PreFilterPlugin{}
 | 
			
		||||
var _ framework.PostFilterPlugin = &PostFilterPlugin{}
 | 
			
		||||
var _ framework.ScorePlugin = &ScorePlugin{}
 | 
			
		||||
@@ -184,6 +195,18 @@ func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ep *PreEnqueuePlugin) Name() string {
 | 
			
		||||
	return enqueuePluginName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ep *PreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
 | 
			
		||||
	ep.called++
 | 
			
		||||
	if ep.admit {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return framework.NewStatus(framework.UnschedulableAndUnresolvable, "not ready for scheduling")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Name returns name of the score plugin.
 | 
			
		||||
func (sp *ScorePlugin) Name() string {
 | 
			
		||||
	return scorePluginName
 | 
			
		||||
@@ -2089,6 +2112,72 @@ func TestPreScorePlugin(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestPreEnqueuePlugin tests invocation of enqueue plugins.
 | 
			
		||||
func TestPreEnqueuePlugin(t *testing.T) {
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodSchedulingReadiness, true)()
 | 
			
		||||
 | 
			
		||||
	// Create a plugin registry for testing. Register only a filter plugin.
 | 
			
		||||
	enqueuePlugin := &PreEnqueuePlugin{}
 | 
			
		||||
	// Plumb a preFilterPlugin to verify if it's called or not.
 | 
			
		||||
	preFilterPlugin := &PreFilterPlugin{}
 | 
			
		||||
	registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin)
 | 
			
		||||
 | 
			
		||||
	// Create the API server and the scheduler with the test plugin set.
 | 
			
		||||
	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "enqueue-plugin", nil), 1,
 | 
			
		||||
		scheduler.WithProfiles(prof),
 | 
			
		||||
		scheduler.WithFrameworkOutOfTreeRegistry(registry))
 | 
			
		||||
	defer testutils.CleanupTest(t, testCtx)
 | 
			
		||||
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name         string
 | 
			
		||||
		pod          *v1.Pod
 | 
			
		||||
		admitEnqueue bool
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:         "pod is admitted to enqueue",
 | 
			
		||||
			pod:          st.MakePod().Name("p").Namespace(testCtx.NS.Name).Container("pause").Obj(),
 | 
			
		||||
			admitEnqueue: true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:         "pod is not admitted to enqueue",
 | 
			
		||||
			pod:          st.MakePod().Name("p").Namespace(testCtx.NS.Name).SchedulingGates([]string{"foo"}).Container("pause").Obj(),
 | 
			
		||||
			admitEnqueue: false,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			enqueuePlugin.admit = tt.admitEnqueue
 | 
			
		||||
			// Create a best effort pod.
 | 
			
		||||
			pod, err := createPausePod(testCtx.ClientSet, tt.pod)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("Error while creating a test pod: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if tt.admitEnqueue {
 | 
			
		||||
				if err := waitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil {
 | 
			
		||||
					t.Errorf("Expected the pod to be schedulable, but got: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
				// Also verify enqueuePlugin is called.
 | 
			
		||||
				if enqueuePlugin.called == 0 {
 | 
			
		||||
					t.Errorf("Expected the enqueuePlugin plugin to be called at least once, but got 0")
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				if err := waitForPodSchedulingGated(testCtx.ClientSet, pod, 10*time.Second); err != nil {
 | 
			
		||||
					t.Errorf("Expected the pod to be scheduling waiting, but got: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
				// Also verify preFilterPlugin is not called.
 | 
			
		||||
				if preFilterPlugin.numPreFilterCalled != 0 {
 | 
			
		||||
					t.Errorf("Expected the preFilter plugin not to be called, but got %v", preFilterPlugin.numPreFilterCalled)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			preFilterPlugin.reset()
 | 
			
		||||
			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestPreemptWithPermitPlugin tests preempt with permit plugins.
 | 
			
		||||
// It verifies how waitingPods behave in different scenarios:
 | 
			
		||||
// - when waitingPods get preempted
 | 
			
		||||
@@ -2450,6 +2539,8 @@ func initRegistryAndConfig(t *testing.T, plugins ...framework.Plugin) (framework
 | 
			
		||||
		plugin := configv1.Plugin{Name: p.Name()}
 | 
			
		||||
 | 
			
		||||
		switch p.(type) {
 | 
			
		||||
		case *PreEnqueuePlugin:
 | 
			
		||||
			pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin)
 | 
			
		||||
		case *PreFilterPlugin:
 | 
			
		||||
			pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin)
 | 
			
		||||
		case *FilterPlugin:
 | 
			
		||||
 
 | 
			
		||||
@@ -30,12 +30,16 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/uuid"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/client-go/dynamic"
 | 
			
		||||
	"k8s.io/client-go/kubernetes"
 | 
			
		||||
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
			
		||||
	configv1 "k8s.io/kube-scheduler/config/v1"
 | 
			
		||||
	apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler"
 | 
			
		||||
	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
			
		||||
@@ -47,6 +51,128 @@ import (
 | 
			
		||||
	"k8s.io/utils/pointer"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestSchedulingGates(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name                  string
 | 
			
		||||
		pods                  []*v1.Pod
 | 
			
		||||
		featureEnabled        bool
 | 
			
		||||
		want                  []string
 | 
			
		||||
		rmPodsSchedulingGates []int
 | 
			
		||||
		wantPostGatesRemoval  []string
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "feature disabled, regular pods",
 | 
			
		||||
			pods: []*v1.Pod{
 | 
			
		||||
				st.MakePod().Name("p1").Container("pause").Obj(),
 | 
			
		||||
				st.MakePod().Name("p2").Container("pause").Obj(),
 | 
			
		||||
			},
 | 
			
		||||
			featureEnabled: false,
 | 
			
		||||
			want:           []string{"p1", "p2"},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "feature enabled, regular pods",
 | 
			
		||||
			pods: []*v1.Pod{
 | 
			
		||||
				st.MakePod().Name("p1").Container("pause").Obj(),
 | 
			
		||||
				st.MakePod().Name("p2").Container("pause").Obj(),
 | 
			
		||||
			},
 | 
			
		||||
			featureEnabled: true,
 | 
			
		||||
			want:           []string{"p1", "p2"},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "feature disabled, one pod carrying scheduling gates",
 | 
			
		||||
			pods: []*v1.Pod{
 | 
			
		||||
				st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
 | 
			
		||||
				st.MakePod().Name("p2").Container("pause").Obj(),
 | 
			
		||||
			},
 | 
			
		||||
			featureEnabled: false,
 | 
			
		||||
			want:           []string{"p1", "p2"},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "feature enabled, one pod carrying scheduling gates",
 | 
			
		||||
			pods: []*v1.Pod{
 | 
			
		||||
				st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
 | 
			
		||||
				st.MakePod().Name("p2").Container("pause").Obj(),
 | 
			
		||||
			},
 | 
			
		||||
			featureEnabled: true,
 | 
			
		||||
			want:           []string{"p2"},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "feature enabled, two pod carrying scheduling gates, and remove gates of one pod",
 | 
			
		||||
			pods: []*v1.Pod{
 | 
			
		||||
				st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
 | 
			
		||||
				st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(),
 | 
			
		||||
				st.MakePod().Name("p3").Container("pause").Obj(),
 | 
			
		||||
			},
 | 
			
		||||
			featureEnabled:        true,
 | 
			
		||||
			want:                  []string{"p3"},
 | 
			
		||||
			rmPodsSchedulingGates: []int{1}, // remove gates of 'p2'
 | 
			
		||||
			wantPostGatesRemoval:  []string{"p2"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodSchedulingReadiness, tt.featureEnabled)()
 | 
			
		||||
 | 
			
		||||
			// Use zero backoff seconds to bypass backoffQ.
 | 
			
		||||
			// It's intended to not start the scheduler's queue, and hence to
 | 
			
		||||
			// not start any flushing logic. We will pop and schedule the Pods manually later.
 | 
			
		||||
			testCtx := testutils.InitTestSchedulerWithOptions(
 | 
			
		||||
				t,
 | 
			
		||||
				testutils.InitTestAPIServer(t, "pod-scheduling-gates", nil),
 | 
			
		||||
				0,
 | 
			
		||||
				scheduler.WithPodInitialBackoffSeconds(0),
 | 
			
		||||
				scheduler.WithPodMaxBackoffSeconds(0),
 | 
			
		||||
			)
 | 
			
		||||
			testutils.SyncInformerFactory(testCtx)
 | 
			
		||||
			defer testutils.CleanupTest(t, testCtx)
 | 
			
		||||
 | 
			
		||||
			cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
 | 
			
		||||
			for _, p := range tt.pods {
 | 
			
		||||
				p.Namespace = ns
 | 
			
		||||
				if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil {
 | 
			
		||||
					t.Fatalf("Failed to create Pod %q: %v", p.Name, err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Wait for the pods to be present in the scheduling queue.
 | 
			
		||||
			if err := wait.Poll(time.Millisecond*200, wait.ForeverTestTimeout, func() (bool, error) {
 | 
			
		||||
				pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
 | 
			
		||||
				return len(pendingPods) == len(tt.pods), nil
 | 
			
		||||
			}); err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Pop the expected pods out. They should be de-queueable.
 | 
			
		||||
			for _, wantPod := range tt.want {
 | 
			
		||||
				podInfo := nextPodOrDie(t, testCtx)
 | 
			
		||||
				if got := podInfo.Pod.Name; got != wantPod {
 | 
			
		||||
					t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if len(tt.rmPodsSchedulingGates) == 0 {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			// Remove scheduling gates from the pod spec.
 | 
			
		||||
			for _, idx := range tt.rmPodsSchedulingGates {
 | 
			
		||||
				patch := `{"spec": {"schedulingGates": null}}`
 | 
			
		||||
				podName := tt.pods[idx].Name
 | 
			
		||||
				if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
 | 
			
		||||
					t.Fatalf("Failed to patch pod %v: %v", podName, err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			// Pop the expected pods out. They should be de-queueable.
 | 
			
		||||
			for _, wantPod := range tt.wantPostGatesRemoval {
 | 
			
		||||
				podInfo := nextPodOrDie(t, testCtx)
 | 
			
		||||
				if got := podInfo.Pod.Name; got != wantPod {
 | 
			
		||||
					t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be
 | 
			
		||||
// moved properly upon their registered events.
 | 
			
		||||
func TestCoreResourceEnqueue(t *testing.T) {
 | 
			
		||||
 
 | 
			
		||||
@@ -776,7 +776,7 @@ func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNam
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PodUnschedulable returns a condition function that returns true if the given pod
 | 
			
		||||
// gets unschedulable status.
 | 
			
		||||
// gets unschedulable status of reason 'Unschedulable'.
 | 
			
		||||
func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
 | 
			
		||||
	return func() (bool, error) {
 | 
			
		||||
		pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
 | 
			
		||||
@@ -806,18 +806,39 @@ func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wai
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
 | 
			
		||||
// PodSchedulingGated returns a condition function that returns true if the given pod
 | 
			
		||||
// gets unschedulable status of reason 'SchedulingGated'.
 | 
			
		||||
func PodSchedulingGated(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
 | 
			
		||||
	return func() (bool, error) {
 | 
			
		||||
		pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			// This could be a connection error so we want to retry.
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
 | 
			
		||||
		return cond != nil && cond.Status == v1.ConditionFalse &&
 | 
			
		||||
			cond.Reason == v1.PodReasonSchedulingGated && pod.Spec.NodeName == "", nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitForPodUnschedulableWithTimeout waits for a pod to fail scheduling and returns
 | 
			
		||||
// an error if it does not become unschedulable within the given timeout.
 | 
			
		||||
func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
 | 
			
		||||
	return wait.Poll(100*time.Millisecond, timeout, PodUnschedulable(cs, pod.Namespace, pod.Name))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// waitForPodUnschedule waits for a pod to fail scheduling and returns
 | 
			
		||||
// WaitForPodUnschedulable waits for a pod to fail scheduling and returns
 | 
			
		||||
// an error if it does not become unschedulable within the timeout duration (30 seconds).
 | 
			
		||||
func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
 | 
			
		||||
	return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitForPodSchedulingGated waits for a pod to be in scheduling gated state
 | 
			
		||||
// and returns an error if it does not fall into this state within the given timeout.
 | 
			
		||||
func WaitForPodSchedulingGated(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
 | 
			
		||||
	return wait.Poll(100*time.Millisecond, timeout, PodSchedulingGated(cs, pod.Namespace, pod.Name))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
 | 
			
		||||
// the expected values.
 | 
			
		||||
func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user