diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index c85012e4651..16cbf50ed26 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -1189,6 +1189,7 @@ func MakeResourceSlice(nodeName, driverName string) *ResourceSliceWrapper { wrapper.Name = nodeName + "-" + driverName wrapper.Spec.NodeName = nodeName wrapper.Spec.Pool.Name = nodeName + wrapper.Spec.Pool.ResourceSliceCount = 1 wrapper.Spec.Driver = driverName return wrapper } @@ -1230,6 +1231,11 @@ func (wrapper *ResourceSliceWrapper) Device(name string, otherFields ...any) *Re return wrapper } +func (wrapper *ResourceSliceWrapper) ResourceSliceCount(count int) *ResourceSliceWrapper { + wrapper.Spec.Pool.ResourceSliceCount = int64(count) + return wrapper +} + // StorageClassWrapper wraps a StorageClass inside. type StorageClassWrapper struct{ storagev1.StorageClass } diff --git a/test/integration/.import-restrictions b/test/integration/.import-restrictions index c9e8a06aebd..4d8b5843f88 100644 --- a/test/integration/.import-restrictions +++ b/test/integration/.import-restrictions @@ -1,5 +1,17 @@ rules: # Discourage import of k8s.io/kubernetes/test/e2e + # because integration tests should "not depend on e2e tests" (from commit aa0b284ca12bdff7bdc9ef31fdbd69dd8091c073). + # + # However, some of the helper packages can be useful + # if they are written such that they: + # - take a kubernetes.Interface as parameter instead of a Framework + # - return errors instead of calling framework.Fail + # + # At least the following packages follow that approach. + # More may be added as needed. + - selectorRegexp: k8s[.]io/kubernetes/test/e2e/framework/(pod|daemonset) + allowedPrefixes: + - "" - selectorRegexp: k8s[.]io/kubernetes/test/e2e forbiddenPrefixes: - "" diff --git a/test/integration/dra/dra_test.go b/test/integration/dra/dra_test.go index 38a77a52f1b..6151ed07dcb 100644 --- a/test/integration/dra/dra_test.go +++ b/test/integration/dra/dra_test.go @@ -59,6 +59,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" st "k8s.io/kubernetes/pkg/scheduler/testing" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/util" "k8s.io/kubernetes/test/utils/format" @@ -119,8 +120,22 @@ func createTestNamespace(tCtx ktesting.TContext, labels map[string]string) strin return ns.Name } +// createSlice creates the given ResourceSlice and removes it when the test is done. +func createSlice(tCtx ktesting.TContext, slice *resourceapi.ResourceSlice) *resourceapi.ResourceSlice { + tCtx.Helper() + slice, err := tCtx.Client().ResourceV1beta1().ResourceSlices().Create(tCtx, slice, metav1.CreateOptions{}) + tCtx.ExpectNoError(err, "create ResourceSlice") + tCtx.CleanupCtx(func(tCtx ktesting.TContext) { + tCtx.Log("Cleaning up ResourceSlice...") + err := tCtx.Client().ResourceV1beta1().ResourceSlices().Delete(tCtx, slice.Name, metav1.DeleteOptions{}) + tCtx.ExpectNoError(err, "delete ResourceSlice") + }) + return slice +} + // createTestClass creates a DeviceClass with a driver name derived from the test namespace func createTestClass(tCtx ktesting.TContext, namespace string) (*resourceapi.DeviceClass, string) { + tCtx.Helper() driverName := namespace + ".driver" class := class.DeepCopy() class.Name = namespace + ".class" @@ -143,6 +158,7 @@ func createTestClass(tCtx ktesting.TContext, namespace string) (*resourceapi.Dev // createClaim creates a claim and in the namespace. // The class must already exist and is used for all requests. func createClaim(tCtx ktesting.TContext, namespace string, suffix string, class *resourceapi.DeviceClass, claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim { + tCtx.Helper() claim = claim.DeepCopy() claim.Namespace = namespace claim.Name += suffix @@ -165,6 +181,7 @@ func createClaim(tCtx ktesting.TContext, namespace string, suffix string, class // createPod create a pod in the namespace, referencing the given claim. func createPod(tCtx ktesting.TContext, namespace string, suffix string, claim *resourceapi.ResourceClaim, pod *v1.Pod) *v1.Pod { + tCtx.Helper() pod = pod.DeepCopy() pod.Name += suffix podName := pod.Name @@ -172,6 +189,11 @@ func createPod(tCtx ktesting.TContext, namespace string, suffix string, claim *r pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name pod, err := tCtx.Client().CoreV1().Pods(namespace).Create(tCtx, pod, metav1.CreateOptions{}) tCtx.ExpectNoError(err, "create pod "+podName) + tCtx.CleanupCtx(func(tCtx ktesting.TContext) { + tCtx.Log("Cleaning up Pod...") + err := tCtx.Client().CoreV1().Pods(namespace).Delete(tCtx, pod.Name, metav1.DeleteOptions{}) + tCtx.ExpectNoError(err, "delete Pod") + }) return pod } @@ -211,6 +233,7 @@ func TestDRA(t *testing.T) { }, f: func(tCtx ktesting.TContext) { tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, false) }) + tCtx.Run("FilterTimeout", testFilterTimeout) tCtx.Run("PrioritizedList", func(tCtx ktesting.TContext) { testPrioritizedList(tCtx, false) }) tCtx.Run("Pod", func(tCtx ktesting.TContext) { testPod(tCtx, true) }) tCtx.Run("PublishResourceSlices", func(tCtx ktesting.TContext) { @@ -387,13 +410,22 @@ func prepareScheduler(tCtx ktesting.TContext) ktesting.TContext { return ktesting.WithValue(tCtx, schedulerKey, scheduler) } +// startScheduler starts the scheduler with an empty config, i.e. all settings at their default. +// This may be used in parallel tests. func startScheduler(tCtx ktesting.TContext) { + startSchedulerWithConfig(tCtx, "") +} + +// startScheduler starts the scheduler with the given config. +// This may be used only in tests which run sequentially if the config is non-empty. +func startSchedulerWithConfig(tCtx ktesting.TContext, config string) { + tCtx.Helper() value := tCtx.Value(schedulerKey) if value == nil { tCtx.Fatal("internal error: startScheduler without a prior prepareScheduler call") } scheduler := value.(*schedulerSingleton) - scheduler.start(tCtx) + scheduler.start(tCtx, config) } type schedulerSingleton struct { @@ -405,7 +437,8 @@ type schedulerSingleton struct { cancel func(err error) } -func (scheduler *schedulerSingleton) start(tCtx ktesting.TContext) { +func (scheduler *schedulerSingleton) start(tCtx ktesting.TContext, config string) { + tCtx.Helper() scheduler.mutex.Lock() defer scheduler.mutex.Unlock() @@ -423,7 +456,8 @@ func (scheduler *schedulerSingleton) start(tCtx ktesting.TContext) { schedulerCtx.Logf("Starting the scheduler for test %s...", tCtx.Name()) ctx := klog.NewContext(schedulerCtx, klog.LoggerWithName(schedulerCtx.Logger(), "scheduler")) ctx, scheduler.cancel = context.WithCancelCause(ctx) - _, scheduler.informerFactory = util.StartScheduler(ctx, schedulerCtx.Client(), schedulerCtx.RESTConfig(), newDefaultSchedulerComponentConfig(schedulerCtx), nil) + cfg := newSchedulerComponentConfig(schedulerCtx, config) + _, scheduler.informerFactory = util.StartScheduler(ctx, schedulerCtx.Client(), schedulerCtx.RESTConfig(), cfg, nil) schedulerCtx.Logf("Started the scheduler for test %s.", tCtx.Name()) } @@ -450,10 +484,11 @@ type schedulerKeyType int var schedulerKey schedulerKeyType -func newDefaultSchedulerComponentConfig(tCtx ktesting.TContext) *config.KubeSchedulerConfiguration { +func newSchedulerComponentConfig(tCtx ktesting.TContext, cfgData string) *config.KubeSchedulerConfiguration { + tCtx.Helper() gvk := kubeschedulerconfigv1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration") cfg := config.KubeSchedulerConfiguration{} - _, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode(nil, &gvk, &cfg) + _, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode([]byte(cfgData), &gvk, &cfg) tCtx.ExpectNoError(err, "decode default scheduler configuration") return &cfg } @@ -535,6 +570,77 @@ func testAdminAccess(tCtx ktesting.TContext, adminAccessEnabled bool) { } } +// testFilterTimeout covers the scheduler plugin's filter timeout configuration and behavior. +// +// It runs the scheduler with non-standard settings and thus cannot run in parallel. +func testFilterTimeout(tCtx ktesting.TContext) { + namespace := createTestNamespace(tCtx, nil) + class, driverName := createTestClass(tCtx, namespace) + // Chosen so that Filter takes a few seconds: + // without a timeout, the test doesn't run too long, + // but long enough that a short timeout triggers. + devicesPerSlice := 9 + deviceNames := make([]string, devicesPerSlice) + for i := 0; i < devicesPerSlice; i++ { + deviceNames[i] = fmt.Sprintf("dev-%d", i) + } + slice := st.MakeResourceSlice("worker-0", driverName).Devices(deviceNames...) + createSlice(tCtx, slice.Obj()) + otherSlice := st.MakeResourceSlice("worker-1", driverName).Devices(deviceNames...) + createdOtherSlice := createSlice(tCtx, otherSlice.Obj()) + claim.Spec.Devices.Requests[0].Count = int64(devicesPerSlice + 1) // Impossible to allocate. + claim := createClaim(tCtx, namespace, "", class, claim) + + tCtx.Run("disabled", func(tCtx ktesting.TContext) { + pod := createPod(tCtx, namespace, "", claim, podWithClaimName) + startSchedulerWithConfig(tCtx, ` +profiles: +- schedulerName: default-scheduler + pluginConfig: + - name: DynamicResources + args: + filterTimeout: 0s +`) + expectPodUnschedulable(tCtx, pod, "cannot allocate all claims") + }) + + tCtx.Run("enabled", func(tCtx ktesting.TContext) { + pod := createPod(tCtx, namespace, "", claim, podWithClaimName) + startSchedulerWithConfig(tCtx, ` +profiles: +- schedulerName: default-scheduler + pluginConfig: + - name: DynamicResources + args: + filterTimeout: 10ms +`) + expectPodUnschedulable(tCtx, pod, "timed out trying to allocate devices") + + // Update one slice such that allocation succeeds. + // The scheduler must retry and should succeed now. + createdOtherSlice.Spec.Devices = append(createdOtherSlice.Spec.Devices, resourceapi.Device{ + Name: fmt.Sprintf("dev-%d", devicesPerSlice), + Basic: &resourceapi.BasicDevice{}, + }) + _, err := tCtx.Client().ResourceV1beta1().ResourceSlices().Update(tCtx, createdOtherSlice, metav1.UpdateOptions{}) + tCtx.ExpectNoError(err, "update worker-1's ResourceSlice") + tCtx.ExpectNoError(e2epod.WaitForPodScheduled(tCtx, tCtx.Client(), namespace, pod.Name)) + }) +} + +func expectPodUnschedulable(tCtx ktesting.TContext, pod *v1.Pod, reason string) { + tCtx.Helper() + tCtx.ExpectNoError(e2epod.WaitForPodNameUnschedulableInNamespace(tCtx, tCtx.Client(), pod.Name, pod.Namespace), fmt.Sprintf("expected pod to be unschedulable because %q", reason)) + pod, err := tCtx.Client().CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{}) + tCtx.ExpectNoError(err) + gomega.NewWithT(tCtx).Expect(pod).To(gomega.HaveField("Status.Conditions", gomega.ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Type": gomega.Equal(v1.PodScheduled), + "Status": gomega.Equal(v1.ConditionFalse), + "Reason": gomega.Equal(v1.PodReasonUnschedulable), + "Message": gomega.ContainSubstring(reason), + })))) +} + func testPrioritizedList(tCtx ktesting.TContext, enabled bool) { tCtx.Parallel() namespace := createTestNamespace(tCtx, nil)