DRA integration: add test case for FilterTimeout

This covers disabling the feature via the configuration, failing to schedule
because of timeouts for all nodes, and retrying after ResourceSlice changes with
partial success (timeout for one node, success for the other).

While at it, some helper code gets improved.
This commit is contained in:
Patrick Ohly
2025-06-02 10:45:11 +02:00
parent 241ac018e2
commit 5cea72d564
3 changed files with 129 additions and 5 deletions

View File

@@ -1189,6 +1189,7 @@ func MakeResourceSlice(nodeName, driverName string) *ResourceSliceWrapper {
wrapper.Name = nodeName + "-" + driverName
wrapper.Spec.NodeName = nodeName
wrapper.Spec.Pool.Name = nodeName
wrapper.Spec.Pool.ResourceSliceCount = 1
wrapper.Spec.Driver = driverName
return wrapper
}
@@ -1230,6 +1231,11 @@ func (wrapper *ResourceSliceWrapper) Device(name string, otherFields ...any) *Re
return wrapper
}
func (wrapper *ResourceSliceWrapper) ResourceSliceCount(count int) *ResourceSliceWrapper {
wrapper.Spec.Pool.ResourceSliceCount = int64(count)
return wrapper
}
// StorageClassWrapper wraps a StorageClass inside.
type StorageClassWrapper struct{ storagev1.StorageClass }

View File

@@ -1,5 +1,17 @@
rules:
# Discourage import of k8s.io/kubernetes/test/e2e
# because integration tests should "not depend on e2e tests" (from commit aa0b284ca12bdff7bdc9ef31fdbd69dd8091c073).
#
# However, some of the helper packages can be useful
# if they are written such that they:
# - take a kubernetes.Interface as parameter instead of a Framework
# - return errors instead of calling framework.Fail
#
# At least the following packages follow that approach.
# More may be added as needed.
- selectorRegexp: k8s[.]io/kubernetes/test/e2e/framework/(pod|daemonset)
allowedPrefixes:
- ""
- selectorRegexp: k8s[.]io/kubernetes/test/e2e
forbiddenPrefixes:
- ""

View File

@@ -59,6 +59,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
st "k8s.io/kubernetes/pkg/scheduler/testing"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/integration/util"
"k8s.io/kubernetes/test/utils/format"
@@ -119,8 +120,22 @@ func createTestNamespace(tCtx ktesting.TContext, labels map[string]string) strin
return ns.Name
}
// createSlice creates the given ResourceSlice and removes it when the test is done.
func createSlice(tCtx ktesting.TContext, slice *resourceapi.ResourceSlice) *resourceapi.ResourceSlice {
tCtx.Helper()
slice, err := tCtx.Client().ResourceV1beta1().ResourceSlices().Create(tCtx, slice, metav1.CreateOptions{})
tCtx.ExpectNoError(err, "create ResourceSlice")
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
tCtx.Log("Cleaning up ResourceSlice...")
err := tCtx.Client().ResourceV1beta1().ResourceSlices().Delete(tCtx, slice.Name, metav1.DeleteOptions{})
tCtx.ExpectNoError(err, "delete ResourceSlice")
})
return slice
}
// createTestClass creates a DeviceClass with a driver name derived from the test namespace
func createTestClass(tCtx ktesting.TContext, namespace string) (*resourceapi.DeviceClass, string) {
tCtx.Helper()
driverName := namespace + ".driver"
class := class.DeepCopy()
class.Name = namespace + ".class"
@@ -143,6 +158,7 @@ func createTestClass(tCtx ktesting.TContext, namespace string) (*resourceapi.Dev
// createClaim creates a claim and in the namespace.
// The class must already exist and is used for all requests.
func createClaim(tCtx ktesting.TContext, namespace string, suffix string, class *resourceapi.DeviceClass, claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
tCtx.Helper()
claim = claim.DeepCopy()
claim.Namespace = namespace
claim.Name += suffix
@@ -165,6 +181,7 @@ func createClaim(tCtx ktesting.TContext, namespace string, suffix string, class
// createPod create a pod in the namespace, referencing the given claim.
func createPod(tCtx ktesting.TContext, namespace string, suffix string, claim *resourceapi.ResourceClaim, pod *v1.Pod) *v1.Pod {
tCtx.Helper()
pod = pod.DeepCopy()
pod.Name += suffix
podName := pod.Name
@@ -172,6 +189,11 @@ func createPod(tCtx ktesting.TContext, namespace string, suffix string, claim *r
pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name
pod, err := tCtx.Client().CoreV1().Pods(namespace).Create(tCtx, pod, metav1.CreateOptions{})
tCtx.ExpectNoError(err, "create pod "+podName)
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
tCtx.Log("Cleaning up Pod...")
err := tCtx.Client().CoreV1().Pods(namespace).Delete(tCtx, pod.Name, metav1.DeleteOptions{})
tCtx.ExpectNoError(err, "delete Pod")
})
return pod
}
@@ -211,6 +233,7 @@ func TestDRA(t *testing.T) {
},
f: func(tCtx ktesting.TContext) {
tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, false) })
tCtx.Run("FilterTimeout", testFilterTimeout)
tCtx.Run("PrioritizedList", func(tCtx ktesting.TContext) { testPrioritizedList(tCtx, false) })
tCtx.Run("Pod", func(tCtx ktesting.TContext) { testPod(tCtx, true) })
tCtx.Run("PublishResourceSlices", func(tCtx ktesting.TContext) {
@@ -387,13 +410,22 @@ func prepareScheduler(tCtx ktesting.TContext) ktesting.TContext {
return ktesting.WithValue(tCtx, schedulerKey, scheduler)
}
// startScheduler starts the scheduler with an empty config, i.e. all settings at their default.
// This may be used in parallel tests.
func startScheduler(tCtx ktesting.TContext) {
startSchedulerWithConfig(tCtx, "")
}
// startScheduler starts the scheduler with the given config.
// This may be used only in tests which run sequentially if the config is non-empty.
func startSchedulerWithConfig(tCtx ktesting.TContext, config string) {
tCtx.Helper()
value := tCtx.Value(schedulerKey)
if value == nil {
tCtx.Fatal("internal error: startScheduler without a prior prepareScheduler call")
}
scheduler := value.(*schedulerSingleton)
scheduler.start(tCtx)
scheduler.start(tCtx, config)
}
type schedulerSingleton struct {
@@ -405,7 +437,8 @@ type schedulerSingleton struct {
cancel func(err error)
}
func (scheduler *schedulerSingleton) start(tCtx ktesting.TContext) {
func (scheduler *schedulerSingleton) start(tCtx ktesting.TContext, config string) {
tCtx.Helper()
scheduler.mutex.Lock()
defer scheduler.mutex.Unlock()
@@ -423,7 +456,8 @@ func (scheduler *schedulerSingleton) start(tCtx ktesting.TContext) {
schedulerCtx.Logf("Starting the scheduler for test %s...", tCtx.Name())
ctx := klog.NewContext(schedulerCtx, klog.LoggerWithName(schedulerCtx.Logger(), "scheduler"))
ctx, scheduler.cancel = context.WithCancelCause(ctx)
_, scheduler.informerFactory = util.StartScheduler(ctx, schedulerCtx.Client(), schedulerCtx.RESTConfig(), newDefaultSchedulerComponentConfig(schedulerCtx), nil)
cfg := newSchedulerComponentConfig(schedulerCtx, config)
_, scheduler.informerFactory = util.StartScheduler(ctx, schedulerCtx.Client(), schedulerCtx.RESTConfig(), cfg, nil)
schedulerCtx.Logf("Started the scheduler for test %s.", tCtx.Name())
}
@@ -450,10 +484,11 @@ type schedulerKeyType int
var schedulerKey schedulerKeyType
func newDefaultSchedulerComponentConfig(tCtx ktesting.TContext) *config.KubeSchedulerConfiguration {
func newSchedulerComponentConfig(tCtx ktesting.TContext, cfgData string) *config.KubeSchedulerConfiguration {
tCtx.Helper()
gvk := kubeschedulerconfigv1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration")
cfg := config.KubeSchedulerConfiguration{}
_, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode(nil, &gvk, &cfg)
_, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode([]byte(cfgData), &gvk, &cfg)
tCtx.ExpectNoError(err, "decode default scheduler configuration")
return &cfg
}
@@ -535,6 +570,77 @@ func testAdminAccess(tCtx ktesting.TContext, adminAccessEnabled bool) {
}
}
// testFilterTimeout covers the scheduler plugin's filter timeout configuration and behavior.
//
// It runs the scheduler with non-standard settings and thus cannot run in parallel.
func testFilterTimeout(tCtx ktesting.TContext) {
namespace := createTestNamespace(tCtx, nil)
class, driverName := createTestClass(tCtx, namespace)
// Chosen so that Filter takes a few seconds:
// without a timeout, the test doesn't run too long,
// but long enough that a short timeout triggers.
devicesPerSlice := 9
deviceNames := make([]string, devicesPerSlice)
for i := 0; i < devicesPerSlice; i++ {
deviceNames[i] = fmt.Sprintf("dev-%d", i)
}
slice := st.MakeResourceSlice("worker-0", driverName).Devices(deviceNames...)
createSlice(tCtx, slice.Obj())
otherSlice := st.MakeResourceSlice("worker-1", driverName).Devices(deviceNames...)
createdOtherSlice := createSlice(tCtx, otherSlice.Obj())
claim.Spec.Devices.Requests[0].Count = int64(devicesPerSlice + 1) // Impossible to allocate.
claim := createClaim(tCtx, namespace, "", class, claim)
tCtx.Run("disabled", func(tCtx ktesting.TContext) {
pod := createPod(tCtx, namespace, "", claim, podWithClaimName)
startSchedulerWithConfig(tCtx, `
profiles:
- schedulerName: default-scheduler
pluginConfig:
- name: DynamicResources
args:
filterTimeout: 0s
`)
expectPodUnschedulable(tCtx, pod, "cannot allocate all claims")
})
tCtx.Run("enabled", func(tCtx ktesting.TContext) {
pod := createPod(tCtx, namespace, "", claim, podWithClaimName)
startSchedulerWithConfig(tCtx, `
profiles:
- schedulerName: default-scheduler
pluginConfig:
- name: DynamicResources
args:
filterTimeout: 10ms
`)
expectPodUnschedulable(tCtx, pod, "timed out trying to allocate devices")
// Update one slice such that allocation succeeds.
// The scheduler must retry and should succeed now.
createdOtherSlice.Spec.Devices = append(createdOtherSlice.Spec.Devices, resourceapi.Device{
Name: fmt.Sprintf("dev-%d", devicesPerSlice),
Basic: &resourceapi.BasicDevice{},
})
_, err := tCtx.Client().ResourceV1beta1().ResourceSlices().Update(tCtx, createdOtherSlice, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "update worker-1's ResourceSlice")
tCtx.ExpectNoError(e2epod.WaitForPodScheduled(tCtx, tCtx.Client(), namespace, pod.Name))
})
}
func expectPodUnschedulable(tCtx ktesting.TContext, pod *v1.Pod, reason string) {
tCtx.Helper()
tCtx.ExpectNoError(e2epod.WaitForPodNameUnschedulableInNamespace(tCtx, tCtx.Client(), pod.Name, pod.Namespace), fmt.Sprintf("expected pod to be unschedulable because %q", reason))
pod, err := tCtx.Client().CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{})
tCtx.ExpectNoError(err)
gomega.NewWithT(tCtx).Expect(pod).To(gomega.HaveField("Status.Conditions", gomega.ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{
"Type": gomega.Equal(v1.PodScheduled),
"Status": gomega.Equal(v1.ConditionFalse),
"Reason": gomega.Equal(v1.PodReasonUnschedulable),
"Message": gomega.ContainSubstring(reason),
}))))
}
func testPrioritizedList(tCtx ktesting.TContext, enabled bool) {
tCtx.Parallel()
namespace := createTestNamespace(tCtx, nil)