From ac9fad60301d49a24ef5ac556e078b298d8da43e Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Thu, 17 Jul 2025 00:30:10 -0700 Subject: [PATCH] feat: trigger PreFilterPreBind in the binding cycle --- .../backend/api_dispatcher/call_queue.go | 4 +- pkg/scheduler/framework/cycle_state.go | 11 + pkg/scheduler/framework/interface.go | 11 +- pkg/scheduler/framework/runtime/framework.go | 62 +++ .../framework/runtime/framework_test.go | 113 +++- pkg/scheduler/metrics/metrics.go | 1 + pkg/scheduler/schedule_one.go | 33 +- pkg/scheduler/schedule_one_test.go | 516 ++++++++++++------ pkg/scheduler/scheduler.go | 39 +- .../testing/framework/fake_plugins.go | 18 +- .../kube-scheduler/framework/cycle_state.go | 6 + .../nominated_node_name/main_test.go | 27 + .../nominated_node_name_test.go | 169 ++++++ 13 files changed, 800 insertions(+), 210 deletions(-) create mode 100644 test/integration/scheduler/nominated_node_name/main_test.go create mode 100644 test/integration/scheduler/nominated_node_name/nominated_node_name_test.go diff --git a/pkg/scheduler/backend/api_dispatcher/call_queue.go b/pkg/scheduler/backend/api_dispatcher/call_queue.go index 22e04b81e8a..14c1719234c 100644 --- a/pkg/scheduler/backend/api_dispatcher/call_queue.go +++ b/pkg/scheduler/backend/api_dispatcher/call_queue.go @@ -132,8 +132,8 @@ func (cq *callQueue) merge(oldAPICall, apiCall *queuedAPICall) error { return nil } - // Merge API calls if they are of the same type for the same object. - err := apiCall.Merge(oldAPICall) + // Send a concrete APICall object to allow for a type assertion. + err := apiCall.Merge(oldAPICall.APICall) if err != nil { err := fmt.Errorf("failed to merge API calls: %w", err) apiCall.sendOnFinish(err) diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go index a5a91783741..690bce50d21 100644 --- a/pkg/scheduler/framework/cycle_state.go +++ b/pkg/scheduler/framework/cycle_state.go @@ -34,6 +34,8 @@ type CycleState struct { skipFilterPlugins sets.Set[string] // skipScorePlugins are plugins that will be skipped in the Score extension point. skipScorePlugins sets.Set[string] + // skipPreBindPlugins are plugins that will be skipped in the PreBind extension point. + skipPreBindPlugins sets.Set[string] } // NewCycleState initializes a new CycleState and returns its pointer. @@ -73,6 +75,14 @@ func (c *CycleState) GetSkipScorePlugins() sets.Set[string] { return c.skipScorePlugins } +func (c *CycleState) SetSkipPreBindPlugins(plugins sets.Set[string]) { + c.skipPreBindPlugins = plugins +} + +func (c *CycleState) GetSkipPreBindPlugins() sets.Set[string] { + return c.skipPreBindPlugins +} + // Clone creates a copy of CycleState and returns its pointer. Clone returns // nil if the context being cloned is nil. func (c *CycleState) Clone() fwk.CycleState { @@ -89,6 +99,7 @@ func (c *CycleState) Clone() fwk.CycleState { copy.recordPluginMetrics = c.recordPluginMetrics copy.skipFilterPlugins = c.skipFilterPlugins copy.skipScorePlugins = c.skipScorePlugins + copy.skipPreBindPlugins = c.skipPreBindPlugins return copy } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 6a8694eb59c..e167789d7e3 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -440,12 +440,12 @@ type ReservePlugin interface { // These plugins are called before a pod being scheduled. type PreBindPlugin interface { Plugin - // PreBindPreFlight is called before PreBind, and the plugin is supposed to return Success, Skip, or Error status. + // PreBindPreFlight is called before PreBind, and the plugin is supposed to return Success, Skip, or Error status + // to tell the scheduler whether the plugin will do something in PreBind or not. // If it returns Success, it means this PreBind plugin will handle this pod. // If it returns Skip, it means this PreBind plugin has nothing to do with the pod, and PreBind will be skipped. // This function should be lightweight, and shouldn't do any actual operation, e.g., creating a volume etc. PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status - // PreBind is called before binding a pod. All prebind plugins must return // success or the pod will be rejected and won't be sent for binding. PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status @@ -526,6 +526,10 @@ type Framework interface { // internal error. In either case the pod is not going to be bound. RunPreBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status + // RunPreBindPreFlights runs the set of configured PreBindPreFlight functions from PreBind plugins. + // It returns immediately if any of the plugins returns a non-skip status. + RunPreBindPreFlights(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status + // RunPostBindPlugins runs the set of configured PostBind plugins. RunPostBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) @@ -547,6 +551,9 @@ type Framework interface { // Pod will remain waiting pod for the minimum duration returned by the Permit plugins. RunPermitPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status + // WillWaitOnPermit returns whether this pod will wait on permit by checking if the pod is a waiting pod. + WillWaitOnPermit(ctx context.Context, pod *v1.Pod) bool + // WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed. WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 56a6b8303eb..dad07a12abe 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1278,6 +1278,10 @@ func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state fwk.CycleSt logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName}) } for _, pl := range f.preBindPlugins { + if state.GetSkipPreBindPlugins().Has(pl.Name()) { + continue + } + ctx := ctx if verboseLogs { logger := klog.LoggerWithName(logger, pl.Name()) @@ -1308,6 +1312,60 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi return status } +// RunPreBindPreFlights runs the set of configured PreBindPreFlight functions from PreBind plugins. +// The returning value is: +// - Success: one or more plugins return success, meaning, some PreBind plugins will work for this pod. +// - Skip: all plugins return skip. +// - Error: any plugin return error. +func (f *frameworkImpl) RunPreBindPreFlights(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (status *fwk.Status) { + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBindPreFlight, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + }() + logger := klog.FromContext(ctx) + verboseLogs := logger.V(4).Enabled() + if verboseLogs { + logger = klog.LoggerWithName(logger, "PreBindPreFlight") + logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName}) + } + skipPlugins := sets.New[string]() + returningStatus := fwk.NewStatus(fwk.Skip) + for _, pl := range f.preBindPlugins { + ctx := ctx + if verboseLogs { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx = klog.NewContext(ctx, logger) + } + status = f.runPreBindPreFlight(ctx, pl, state, pod, nodeName) + switch { + case status.Code() == fwk.Error: + err := status.AsError() + logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName) + return fwk.AsStatus(fmt.Errorf("running PreBindPreFlight %q: %w", pl.Name(), err)) + case status.IsSuccess(): + // We return success when one or more plugins return success. + returningStatus = nil + case status.IsSkip(): + skipPlugins.Insert(pl.Name()) + default: + // Other statuses are unexpected + return fwk.AsStatus(fmt.Errorf("PreBindPreFlight %s returned %q, which is unsupported. It is supposed to return Success, Skip, or Error status", pl.Name(), status.Code())) + } + } + state.SetSkipPreBindPlugins(skipPlugins) + return returningStatus +} + +func (f *frameworkImpl) runPreBindPreFlight(ctx context.Context, pl framework.PreBindPlugin, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status { + if !state.ShouldRecordPluginMetrics() { + return pl.PreBindPreFlight(ctx, state, pod, nodeName) + } + startTime := time.Now() + status := pl.PreBindPreFlight(ctx, state, pod, nodeName) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PreBindPreFlight, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + return status +} + // RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status. func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (status *fwk.Status) { startTime := time.Now() @@ -1536,6 +1594,10 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit return status, timeout } +func (f *frameworkImpl) WillWaitOnPermit(ctx context.Context, pod *v1.Pod) bool { + return f.waitingPods.get(pod.UID) != nil +} + // WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed. func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status { waitingPod := f.waitingPods.get(pod.UID) diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index d3a261bef17..ecef4453bc2 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -235,12 +235,12 @@ func (pl *TestPlugin) Reserve(ctx context.Context, state fwk.CycleState, p *v1.P func (pl *TestPlugin) Unreserve(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) { } -func (pl *TestPlugin) PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status { - return fwk.NewStatus(fwk.Code(pl.inj.PreBindStatus), injectReason) +func (pl *TestPlugin) PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status { + return fwk.NewStatus(fwk.Code(pl.inj.PreBindPreFlightStatus), injectReason) } -func (pl *TestPlugin) PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status { - return nil +func (pl *TestPlugin) PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status { + return fwk.NewStatus(fwk.Code(pl.inj.PreBindStatus), injectReason) } func (pl *TestPlugin) PostBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) { @@ -2613,6 +2613,110 @@ func TestPreBindPlugins(t *testing.T) { } } +func TestPreBindPreFlightPlugins(t *testing.T) { + tests := []struct { + name string + plugins []*TestPlugin + wantStatus *fwk.Status + }{ + { + name: "Skip when there's no PreBind Plugin", + plugins: []*TestPlugin{}, + wantStatus: fwk.NewStatus(fwk.Skip), + }, + { + name: "Success when PreBindPreFlight returns Success", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)}, + }, + { + name: "TestPlugin2", + inj: injectedResult{PreBindPreFlightStatus: int(fwk.Success)}, + }, + }, + wantStatus: nil, + }, + { + name: "Skip when all PreBindPreFlight returns Skip", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)}, + }, + { + name: "TestPlugin2", + inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)}, + }, + }, + wantStatus: fwk.NewStatus(fwk.Skip), + }, + { + name: "Error when PreBindPreFlight returns Error", + plugins: []*TestPlugin{ + { + name: "TestPlugin1", + inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)}, + }, + { + name: "TestPlugin2", + inj: injectedResult{PreBindPreFlightStatus: int(fwk.Error)}, + }, + }, + wantStatus: fwk.AsStatus(fmt.Errorf(`running PreBindPreFlight "TestPlugin2": %w`, errInjectedStatus)), + }, + { + name: "Error when PreBindPreFlight returns Unschedulable", + plugins: []*TestPlugin{ + { + name: "TestPlugin", + inj: injectedResult{PreBindPreFlightStatus: int(fwk.Unschedulable)}, + }, + }, + wantStatus: fwk.NewStatus(fwk.Error, "PreBindPreFlight TestPlugin returned \"Unschedulable\", which is unsupported. It is supposed to return Success, Skip, or Error status"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + registry := Registry{} + configPlugins := &config.Plugins{} + + for _, pl := range tt.plugins { + tmpPl := pl + if err := registry.Register(pl.name, func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return tmpPl, nil + }); err != nil { + t.Fatalf("Unable to register pre bind plugins: %s", pl.name) + } + + configPlugins.PreBind.Enabled = append( + configPlugins.PreBind.Enabled, + config.Plugin{Name: pl.name}, + ) + } + profile := config.KubeSchedulerProfile{Plugins: configPlugins} + ctx, cancel := context.WithCancel(ctx) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) + if err != nil { + t.Fatalf("fail to create framework: %s", err) + } + defer func() { + _ = f.Close() + }() + + status := f.RunPreBindPreFlights(ctx, state, pod, "") + + if diff := cmp.Diff(tt.wantStatus, status, statusCmpOpts...); diff != "" { + t.Errorf("Wrong status code (-want,+got):\n%s", diff) + } + }) + } +} + func TestReservePlugins(t *testing.T) { tests := []struct { name string @@ -3458,6 +3562,7 @@ type injectedResult struct { PostFilterStatus int `json:"postFilterStatus,omitempty"` PreScoreStatus int `json:"preScoreStatus,omitempty"` ReserveStatus int `json:"reserveStatus,omitempty"` + PreBindPreFlightStatus int `json:"preBindPreFlightStatus,omitempty"` PreBindStatus int `json:"preBindStatus,omitempty"` BindStatus int `json:"bindStatus,omitempty"` PermitStatus int `json:"permitStatus,omitempty"` diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index c383c06d6ff..9ff706cfc6d 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -73,6 +73,7 @@ const ( Score = "Score" ScoreExtensionNormalize = "ScoreExtensionNormalize" PreBind = "PreBind" + PreBindPreFlight = "PreBindPreFlight" Bind = "Bind" PostBind = "PostBind" Reserve = "Reserve" diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index b094ba5f5f6..1ea22206cab 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -279,6 +279,26 @@ func (sched *Scheduler) bindingCycle( assumedPod := assumedPodInfo.Pod + if sched.nominatedNodeNameForExpectationEnabled { + preFlightStatus := schedFramework.RunPreBindPreFlights(ctx, state, assumedPod, scheduleResult.SuggestedHost) + if preFlightStatus.Code() == fwk.Error || + // Unschedulable status is not supported in PreBindPreFlight and hence we regard it as an error. + preFlightStatus.IsRejected() { + return preFlightStatus + } + if preFlightStatus.IsSuccess() || schedFramework.WillWaitOnPermit(ctx, assumedPod) { + // Add NominatedNodeName to tell the external components (e.g., the cluster autoscaler) that the pod is about to be bound to the node. + // We only do this when any of WaitOnPermit or PreBind will work because otherwise the pod will be soon bound anyway. + if err := updatePod(ctx, sched.client, schedFramework.APICacher(), assumedPod, nil, &framework.NominatingInfo{ + NominatedNodeName: scheduleResult.SuggestedHost, + NominatingMode: framework.ModeOverride, + }); err != nil { + logger.Error(err, "Failed to update the nominated node name in the binding cycle", "pod", klog.KObj(assumedPod), "nominatedNodeName", scheduleResult.SuggestedHost) + // We continue the processing because it's not critical enough to stop binding cycles here. + } + } + } + // Run "permit" plugins. if status := schedFramework.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() { if status.IsRejected() { @@ -1118,12 +1138,21 @@ func updatePod(ctx context.Context, client clientset.Interface, apiCacher framew return err } logger := klog.FromContext(ctx) - logger.V(3).Info("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason) + logValues := []any{"pod", klog.KObj(pod)} + if condition != nil { + logValues = append(logValues, "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason) + } + if nominatingInfo != nil { + logValues = append(logValues, "nominatedNodeName", nominatingInfo.NominatedNodeName, "nominatingMode", nominatingInfo.Mode()) + } + logger.V(3).Info("Updating pod condition and nominated node name", logValues...) + podStatusCopy := pod.Status.DeepCopy() // NominatedNodeName is updated only if we are trying to set it, and the value is // different from the existing one. nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName - if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate { + podConditionNeedsUpdate := condition != nil && podutil.UpdatePodCondition(podStatusCopy, condition) + if !podConditionNeedsUpdate && !nnnNeedsUpdate { return nil } if nnnNeedsUpdate { diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 21a513621d5..cbb4f6a077d 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -18,6 +18,7 @@ package scheduler import ( "context" + "encoding/json" "errors" "fmt" "math" @@ -39,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -57,13 +59,13 @@ import ( fwk "k8s.io/kube-scheduler/framework" "k8s.io/kubernetes/pkg/features" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" - "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" - "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" + apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" + apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" + apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality" @@ -680,17 +682,24 @@ func TestSchedulerScheduleOne(t *testing.T) { schedulingErr := errors.New("scheduler") permitErr := errors.New("permit error") preBindErr := errors.New("on PreBind") + preBindPreFlightErr := errors.New("on PreBindPreFlight") bindingErr := errors.New("binder") testPod := podWithID("foo", "") assignedTestPod := podWithID("foo", testNode.Name) + type podToAdmit struct { + pluginName string + pod types.UID + } + table := []struct { name string sendPod *v1.Pod registerPluginFuncs []tf.RegisterPluginFunc injectBindError error injectSchedulingError error + podToAdmit *podToAdmit mockScheduleResult ScheduleResult expectErrorPod *v1.Pod expectForgetPod *v1.Pod @@ -698,8 +707,13 @@ func TestSchedulerScheduleOne(t *testing.T) { expectPodInBackoffQ *v1.Pod expectPodInUnschedulable *v1.Pod expectError error + expectNominatedNodeName string expectBind *v1.Binding eventReason string + // If nil, the test case is run with both enabled and disabled + nominatedNodeNameForExpectationEnabled *bool + // If nil, the test case is run with both enabled and disabled + asyncAPICallsEnabled *bool }{ { name: "schedule pod failed", @@ -768,32 +782,112 @@ func TestSchedulerScheduleOne(t *testing.T) { eventReason: "FailedScheduling", }, { - name: "prebind failed with status code rejected", + name: "nominated node name is not set, permit plugin is working, but the feature gate NominatedNodeNameForExpectation is disabled", sendPod: testPod, registerPluginFuncs: []tf.RegisterPluginFunc{ - tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Unschedulable, "rejected on prebind"))), + tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Wait, "rejected on permit"), time.Minute)), }, - mockScheduleResult: scheduleResultOk, - expectErrorPod: assignedTestPod, - expectForgetPod: assignedTestPod, - expectAssumedPod: assignedTestPod, - expectPodInBackoffQ: testPod, - expectError: fmt.Errorf("rejected on prebind"), - eventReason: "FailedScheduling", + podToAdmit: &podToAdmit{pluginName: "FakePermit", pod: testPod.UID}, + mockScheduleResult: scheduleResultOk, + nominatedNodeNameForExpectationEnabled: ptr.To(false), + expectAssumedPod: assignedTestPod, + expectBind: fakeBinding, + eventReason: "Scheduled", + }, + { + name: "nominated node name is set, permit plugin is working in wait on permit phase", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Wait, "rejected on permit"), time.Minute)), + }, + podToAdmit: &podToAdmit{pluginName: "FakePermit", pod: testPod.UID}, + mockScheduleResult: scheduleResultOk, + expectNominatedNodeName: testNode.Name, + // Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding. + // So, it's safe to run this test with asyncAPICallsEnabled = false only. + asyncAPICallsEnabled: ptr.To(false), + nominatedNodeNameForExpectationEnabled: ptr.To(true), + expectAssumedPod: assignedTestPod, + expectBind: fakeBinding, + eventReason: "Scheduled", + }, + { + name: "prebindpreflight failed with status code error", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.AsStatus(preBindPreFlightErr), nil)), + }, + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + nominatedNodeNameForExpectationEnabled: ptr.To(true), + expectAssumedPod: assignedTestPod, + expectPodInBackoffQ: testPod, + expectError: fmt.Errorf(`running PreBindPreFlight "FakePreBind": %w`, preBindPreFlightErr), + eventReason: "FailedScheduling", + }, + { + name: "prebindpreflight failed with status code unschedulable", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Unschedulable, "rejected on prebindpreflight"), nil)), + }, + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + nominatedNodeNameForExpectationEnabled: ptr.To(true), + expectAssumedPod: assignedTestPod, + expectPodInBackoffQ: testPod, + expectError: fmt.Errorf("PreBindPreFlight FakePreBind returned \"Unschedulable\", which is unsupported. It is supposed to return Success, Skip, or Error status"), + eventReason: "FailedScheduling", + }, + { + name: "prebind isn't called and nominated node name isn't set, if preflight returns skip", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + // Configure it to return error on prebind to make sure it's not called. + tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Skip), fwk.NewStatus(fwk.Error, "rejected on prebind"))), + }, + mockScheduleResult: scheduleResultOk, + expectAssumedPod: assignedTestPod, + nominatedNodeNameForExpectationEnabled: ptr.To(true), + expectBind: fakeBinding, + eventReason: "Scheduled", + }, + { + name: "prebind is called and nominated node name is set, if preflight returns success", + sendPod: testPod, + registerPluginFuncs: []tf.RegisterPluginFunc{ + tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, nil)), + }, + mockScheduleResult: scheduleResultOk, + nominatedNodeNameForExpectationEnabled: ptr.To(true), + // Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding. + // So, it's safe to run this test with asyncAPICallsEnabled = false only. + asyncAPICallsEnabled: ptr.To(false), + expectNominatedNodeName: testNode.Name, + expectAssumedPod: assignedTestPod, + expectBind: fakeBinding, + eventReason: "Scheduled", }, { name: "prebind failed with status code error", sendPod: testPod, registerPluginFuncs: []tf.RegisterPluginFunc{ - tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.AsStatus(preBindErr))), + tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, fwk.AsStatus(preBindErr))), }, - mockScheduleResult: scheduleResultOk, - expectErrorPod: assignedTestPod, - expectForgetPod: assignedTestPod, - expectAssumedPod: assignedTestPod, - expectPodInBackoffQ: testPod, - expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr), - eventReason: "FailedScheduling", + mockScheduleResult: scheduleResultOk, + expectErrorPod: assignedTestPod, + expectForgetPod: assignedTestPod, + nominatedNodeNameForExpectationEnabled: ptr.To(true), + // Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding. + // So, it's safe to run this test with asyncAPICallsEnabled = false only. + asyncAPICallsEnabled: ptr.To(false), + expectNominatedNodeName: testNode.Name, + expectAssumedPod: assignedTestPod, + expectPodInBackoffQ: testPod, + expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr), + eventReason: "FailedScheduling", }, { name: "binding failed", @@ -825,158 +919,221 @@ func TestSchedulerScheduleOne(t *testing.T) { } for _, qHintEnabled := range []bool{true, false} { - for _, asyncAPICallsEnabled := range []bool{true, false} { - for _, item := range table { - t.Run(fmt.Sprintf("%s (Queueing hints enabled: %v, Async API calls enabled: %v)", item.name, qHintEnabled, asyncAPICallsEnabled), func(t *testing.T) { - if !qHintEnabled { - featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, false) + for _, item := range table { + asyncAPICallsEnabled := []bool{true, false} + if item.asyncAPICallsEnabled != nil { + asyncAPICallsEnabled = []bool{*item.asyncAPICallsEnabled} + } + for _, asyncAPICallsEnabled := range asyncAPICallsEnabled { + nominatedNodeNameForExpectationEnabled := []bool{true, false} + if item.nominatedNodeNameForExpectationEnabled != nil { + nominatedNodeNameForExpectationEnabled = []bool{*item.nominatedNodeNameForExpectationEnabled} + } + for _, nominatedNodeNameForExpectationEnabled := range nominatedNodeNameForExpectationEnabled { + if nominatedNodeNameForExpectationEnabled && !qHintEnabled { + // If the QHint feature gate is disabled, NominatedNodeNameForExpectation cannot be enabled + // because that means users set the emilation version to 1.33 or later. + continue } - logger, ctx := ktesting.NewTestContext(t) - var gotError error - var gotPod *v1.Pod - var gotForgetPod *v1.Pod - var gotAssumedPod *v1.Pod - var gotBinding *v1.Binding - - client := clientsetfake.NewClientset(item.sendPod) - client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - if action.GetSubresource() != "binding" { - return false, nil, nil + t.Run(fmt.Sprintf("%s (Queueing hints enabled: %v, Async API calls enabled: %v, NominatedNodeNameForExpectation enabled: %v)", item.name, qHintEnabled, asyncAPICallsEnabled, nominatedNodeNameForExpectationEnabled), func(t *testing.T) { + if !qHintEnabled { + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, false) } - gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) - return true, gotBinding, item.injectBindError - }) - - var apiDispatcher *apidispatcher.APIDispatcher - if asyncAPICallsEnabled { - apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances) - apiDispatcher.Run(logger) - defer apiDispatcher.Close() - } - - internalCache := internalcache.New(ctx, 30*time.Second, apiDispatcher) - cache := &fakecache.Cache{ - Cache: internalCache, - ForgetFunc: func(pod *v1.Pod) { - gotForgetPod = pod - }, - AssumeFunc: func(pod *v1.Pod) { - gotAssumedPod = pod - }, - IsAssumedPodFunc: func(pod *v1.Pod) bool { - if pod == nil || gotAssumedPod == nil { - return false + logger, ctx := ktesting.NewTestContext(t) + var gotError error + var gotPod *v1.Pod + var gotForgetPod *v1.Pod + var gotAssumedPod *v1.Pod + var gotBinding *v1.Binding + client := clientsetfake.NewClientset(item.sendPod) + informerFactory := informers.NewSharedInformerFactory(client, 0) + client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "binding" { + return false, nil, nil } - return pod.UID == gotAssumedPod.UID - }, - } - informerFactory := informers.NewSharedInformerFactory(client, 0) + gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) + return true, gotBinding, item.injectBindError + }) - schedFramework, err := tf.NewFramework(ctx, - append(item.registerPluginFuncs, - tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - ), - testSchedulerName, - frameworkruntime.WithClientSet(client), - frameworkruntime.WithAPIDispatcher(apiDispatcher), - frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), - frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), - frameworkruntime.WithInformerFactory(informerFactory), - ) - if err != nil { - t.Fatal(err) - } - - ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done()) - queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar), internalqueue.WithAPIDispatcher(apiDispatcher)) - if asyncAPICallsEnabled { - schedFramework.SetAPICacher(apicache.New(queue, cache)) - } - - sched := &Scheduler{ - Cache: cache, - client: client, - NextPod: queue.Pop, - SchedulingQueue: queue, - Profiles: profile.Map{testSchedulerName: schedFramework}, - APIDispatcher: apiDispatcher, - } - queue.Add(logger, item.sendPod) - - sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) { - return item.mockScheduleResult, item.injectSchedulingError - } - sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *framework.NominatingInfo, start time.Time) { - gotPod = p.Pod - gotError = status.AsError() - - sched.handleSchedulingFailure(ctx, fwk, p, status, ni, start) - } - called := make(chan struct{}) - stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { - e, _ := obj.(*eventsv1.Event) - if e.Reason != item.eventReason { - t.Errorf("got event %v, want %v", e.Reason, item.eventReason) + var apiDispatcher *apidispatcher.APIDispatcher + if asyncAPICallsEnabled { + apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances) + apiDispatcher.Run(logger) + defer apiDispatcher.Close() } - close(called) + + internalCache := internalcache.New(ctx, 30*time.Second, apiDispatcher) + cache := &fakecache.Cache{ + Cache: internalCache, + ForgetFunc: func(pod *v1.Pod) { + gotForgetPod = pod + }, + AssumeFunc: func(pod *v1.Pod) { + gotAssumedPod = pod + }, + IsAssumedPodFunc: func(pod *v1.Pod) bool { + if pod == nil || gotAssumedPod == nil { + return false + } + return pod.UID == gotAssumedPod.UID + }, + } + mu := &sync.Mutex{} + updatedNominatedNodeName := "" + client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "status" { + return false, nil, nil + } + patchAction := action.(clienttesting.PatchAction) + podName := patchAction.GetName() + namespace := patchAction.GetNamespace() + patch := patchAction.GetPatch() + pod, err := informerFactory.Core().V1().Pods().Lister().Pods(namespace).Get(podName) + if err != nil { + t.Fatalf("Failed to get the original pod %s/%s before patching: %v\n", namespace, podName, err) + } + marshalledPod, err := json.Marshal(pod) + if err != nil { + t.Fatalf("Failed to marshal the original pod %s/%s: %v", namespace, podName, err) + } + updated, err := strategicpatch.StrategicMergePatch(marshalledPod, patch, v1.Pod{}) + if err != nil { + t.Fatalf("Failed to apply strategic merge patch %q on pod %#v: %v", patch, marshalledPod, err) + } + updatedPod := &v1.Pod{} + if err := json.Unmarshal(updated, updatedPod); err != nil { + t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err) + } + mu.Lock() + updatedNominatedNodeName = updatedPod.Status.NominatedNodeName + mu.Unlock() + return true, nil, nil + }) + + schedFramework, err := tf.NewFramework(ctx, + append(item.registerPluginFuncs, + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ), + testSchedulerName, + frameworkruntime.WithClientSet(client), + frameworkruntime.WithAPIDispatcher(apiDispatcher), + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), + frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), + frameworkruntime.WithInformerFactory(informerFactory), + ) + if err != nil { + t.Fatal(err) + } + + ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done()) + queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar), internalqueue.WithAPIDispatcher(apiDispatcher)) + if asyncAPICallsEnabled { + schedFramework.SetAPICacher(apicache.New(queue, cache)) + } + + sched := &Scheduler{ + Cache: cache, + client: client, + NextPod: queue.Pop, + SchedulingQueue: queue, + Profiles: profile.Map{testSchedulerName: schedFramework}, + APIDispatcher: apiDispatcher, + nominatedNodeNameForExpectationEnabled: nominatedNodeNameForExpectationEnabled, + } + queue.Add(logger, item.sendPod) + + sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) { + return item.mockScheduleResult, item.injectSchedulingError + } + sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *framework.NominatingInfo, start time.Time) { + gotPod = p.Pod + gotError = status.AsError() + + sched.handleSchedulingFailure(ctx, fwk, p, status, ni, start) + } + called := make(chan struct{}) + stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { + e, _ := obj.(*eventsv1.Event) + if e.Reason != item.eventReason { + t.Errorf("got event %v, want %v", e.Reason, item.eventReason) + } + close(called) + }) + if err != nil { + t.Fatal(err) + } + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + sched.ScheduleOne(ctx) + + if item.podToAdmit != nil { + for { + if waitingPod := sched.Profiles[testSchedulerName].GetWaitingPod(item.podToAdmit.pod); waitingPod != nil { + waitingPod.Allow(item.podToAdmit.pluginName) + break + } + } + } + + <-called + + mu.Lock() + if item.expectNominatedNodeName != updatedNominatedNodeName { + t.Errorf("Expected nominated node name %q, got %q", item.expectNominatedNodeName, updatedNominatedNodeName) + } + mu.Unlock() + + if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" { + t.Errorf("Unexpected assumed pod (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(item.expectErrorPod, gotPod); diff != "" { + t.Errorf("Unexpected error pod (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(item.expectForgetPod, gotForgetPod); diff != "" { + t.Errorf("Unexpected forget pod (-want,+got):\n%s", diff) + } + if item.expectError == nil || gotError == nil { + if !errors.Is(gotError, item.expectError) { + t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError, gotError) + } + } else if item.expectError.Error() != gotError.Error() { + t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError.Error(), gotError.Error()) + } + if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" { + t.Errorf("Unexpected binding (-want,+got):\n%s", diff) + } + // We have to use wait here because the Pod goes to the binding cycle in some test cases + // and the inflight pods might not be empty immediately at this point in such case. + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + return len(queue.InFlightPods()) == 0, nil + }); err != nil { + t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods())) + } + podsInBackoffQ := queue.PodsInBackoffQ() + if item.expectPodInBackoffQ != nil { + if !podListContainsPod(podsInBackoffQ, item.expectPodInBackoffQ) { + t.Errorf("Expected to find pod in backoffQ, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInBackoffQ, podsInBackoffQ) + } + } else { + if len(podsInBackoffQ) > 0 { + t.Errorf("Expected backoffQ to be empty, but it's not.\nGot: %v", podsInBackoffQ) + } + } + unschedulablePods := queue.UnschedulablePods() + if item.expectPodInUnschedulable != nil { + if !podListContainsPod(unschedulablePods, item.expectPodInUnschedulable) { + t.Errorf("Expected to find pod in unschedulable, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInUnschedulable, unschedulablePods) + } + } else { + if len(unschedulablePods) > 0 { + t.Errorf("Expected unschedulable pods to be empty, but it's not.\nGot: %v", unschedulablePods) + } + } + stopFunc() }) - if err != nil { - t.Fatal(err) - } - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) - sched.ScheduleOne(ctx) - <-called - if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" { - t.Errorf("Unexpected assumed pod (-want,+got):\n%s", diff) - } - if diff := cmp.Diff(item.expectErrorPod, gotPod); diff != "" { - t.Errorf("Unexpected error pod (-want,+got):\n%s", diff) - } - if diff := cmp.Diff(item.expectForgetPod, gotForgetPod); diff != "" { - t.Errorf("Unexpected forget pod (-want,+got):\n%s", diff) - } - if item.expectError == nil || gotError == nil { - if !errors.Is(gotError, item.expectError) { - t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError, gotError) - } - } else if item.expectError.Error() != gotError.Error() { - t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError.Error(), gotError.Error()) - } - if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" { - t.Errorf("Unexpected binding (-want,+got):\n%s", diff) - } - // We have to use wait here because the Pod goes to the binding cycle in some test cases - // and the inflight pods might not be empty immediately at this point in such case. - if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - return len(queue.InFlightPods()) == 0, nil - }); err != nil { - t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods())) - } - podsInBackoffQ := queue.PodsInBackoffQ() - if item.expectPodInBackoffQ != nil { - if !podListContainsPod(podsInBackoffQ, item.expectPodInBackoffQ) { - t.Errorf("Expected to find pod in backoffQ, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInBackoffQ, podsInBackoffQ) - } - } else { - if len(podsInBackoffQ) > 0 { - t.Errorf("Expected backoffQ to be empty, but it's not.\nGot: %v", podsInBackoffQ) - } - } - unschedulablePods := queue.UnschedulablePods() - if item.expectPodInUnschedulable != nil { - if !podListContainsPod(unschedulablePods, item.expectPodInUnschedulable) { - t.Errorf("Expected to find pod in unschedulable, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInUnschedulable, unschedulablePods) - } - } else { - if len(unschedulablePods) > 0 { - t.Errorf("Expected unschedulable pods to be empty, but it's not.\nGot: %v", unschedulablePods) - } - } - stopFunc() - }) + } } } } @@ -1079,7 +1236,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) { sendPod: testPod, mockScheduleResult: scheduleResultOk, registerPluginFuncs: []tf.RegisterPluginFunc{ - tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Unschedulable))), + tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, fwk.NewStatus(fwk.Unschedulable))), }, mockWaitOnPermitResult: fwk.NewStatus(fwk.Success), mockRunPreBindPluginsResult: fwk.NewStatus(fwk.Unschedulable, preBindErr.Error()), @@ -1615,9 +1772,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { findErr := fmt.Errorf("find err") assumeErr := fmt.Errorf("assume err") bindErr := fmt.Errorf("bind err") - client := clientsetfake.NewClientset() - - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) // This can be small because we wait for pod to finish scheduling first chanTimeout := 2 * time.Second @@ -1709,7 +1863,9 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() fakeVolumeBinder := volumebinding.NewFakeVolumeBinder(item.volumeBinderConfig) - s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, t, fakeVolumeBinder, eventBroadcaster, asyncAPICallsEnabled) + client := clientsetfake.NewClientset() + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) + s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, t, client, fakeVolumeBinder, eventBroadcaster, asyncAPICallsEnabled) eventChan := make(chan struct{}) stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { e, _ := obj.(*eventsv1.Event) @@ -4361,7 +4517,10 @@ func setupTestScheduler(ctx context.Context, t *testing.T, client clientset.Inte return sched, errChan } -func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, volumeBinder volumebinding.SchedulerVolumeBinder, broadcaster events.EventBroadcaster, asyncAPICallsEnabled bool) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, client *clientsetfake.Clientset, volumeBinder volumebinding.SchedulerVolumeBinder, broadcaster events.EventBroadcaster, asyncAPICallsEnabled bool) (*Scheduler, chan *v1.Binding, chan error) { + if client == nil { + client = clientsetfake.NewClientset() + } logger := klog.FromContext(ctx) testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}} queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) @@ -4374,8 +4533,6 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, volu } testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}} - client := clientsetfake.NewClientset(&testNode, &testPVC) - bindingChan := interruptOnBind(client) var apiDispatcher *apidispatcher.APIDispatcher if asyncAPICallsEnabled { @@ -4389,7 +4546,16 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, volu informerFactory := informers.NewSharedInformerFactory(client, 0) pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() pvcInformer.Informer().GetStore().Add(&testPVC) - + if _, err := client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("failed to create pod: %v", err) + } + if _, err := client.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, &testPVC, metav1.CreateOptions{}); err != nil { + t.Fatalf("failed to create PVC: %v", err) + } + if _, err := client.CoreV1().Nodes().Create(ctx, &testNode, metav1.CreateOptions{}); err != nil { + t.Fatalf("failed to create node: %v", err) + } + bindingChan := interruptOnBind(client) fns := []tf.RegisterPluginFunc{ tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index bd7f22f706c..32ed81df716 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -26,7 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" @@ -40,13 +40,13 @@ import ( "k8s.io/kubernetes/pkg/features" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" - "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" - "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" + apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" + apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/backend/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" + apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources" @@ -120,6 +120,8 @@ type Scheduler struct { // registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start. registeredHandlers []cache.ResourceEventHandlerRegistration + + nominatedNodeNameForExpectationEnabled bool } func (sched *Scheduler) applyDefaultHandlers() { @@ -320,11 +322,11 @@ func New(ctx context.Context, var resourceClaimCache *assumecache.AssumeCache var resourceSliceTracker *resourceslicetracker.Tracker var draManager framework.SharedDRAManager - if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { resourceClaimInformer := informerFactory.Resource().V1().ResourceClaims().Informer() resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) resourceSliceTrackerOpts := resourceslicetracker.Options{ - EnableDeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints), + EnableDeviceTaints: feature.DefaultFeatureGate.Enabled(features.DRADeviceTaints), SliceInformer: informerFactory.Resource().V1().ResourceSlices(), KubeClient: client, } @@ -341,7 +343,7 @@ func New(ctx context.Context, draManager = dynamicresources.NewDRAManager(ctx, resourceClaimCache, resourceSliceTracker, informerFactory) } var apiDispatcher *apidispatcher.APIDispatcher - if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerAsyncAPICalls) { + if feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncAPICalls) { apiDispatcher = apidispatcher.New(client, int(options.parallelism), apicalls.Relevances) } @@ -420,16 +422,17 @@ func New(ctx context.Context, debugger.ListenForSignal(ctx) sched := &Scheduler{ - Cache: schedulerCache, - client: client, - nodeInfoSnapshot: snapshot, - percentageOfNodesToScore: options.percentageOfNodesToScore, - Extenders: extenders, - StopEverything: stopEverything, - SchedulingQueue: podQueue, - Profiles: profiles, - logger: logger, - APIDispatcher: apiDispatcher, + Cache: schedulerCache, + client: client, + nodeInfoSnapshot: snapshot, + percentageOfNodesToScore: options.percentageOfNodesToScore, + Extenders: extenders, + StopEverything: stopEverything, + SchedulingQueue: podQueue, + Profiles: profiles, + logger: logger, + APIDispatcher: apiDispatcher, + nominatedNodeNameForExpectationEnabled: feature.DefaultFeatureGate.Enabled(features.NominatedNodeNameForExpectation), } sched.NextPod = podQueue.Pop sched.applyDefaultHandlers() @@ -472,7 +475,7 @@ func buildQueueingHintMap(ctx context.Context, es []framework.EnqueueExtensions) registerNodeTaintUpdated := false for _, event := range events { fn := event.QueueingHintFn - if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { + if fn == nil || !feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { fn = defaultQueueingHintFn } diff --git a/pkg/scheduler/testing/framework/fake_plugins.go b/pkg/scheduler/testing/framework/fake_plugins.go index 065a0e861f3..607fbdb39f4 100644 --- a/pkg/scheduler/testing/framework/fake_plugins.go +++ b/pkg/scheduler/testing/framework/fake_plugins.go @@ -199,7 +199,8 @@ func NewFakeReservePlugin(status *fwk.Status) frameworkruntime.PluginFactory { // FakePreBindPlugin is a test prebind plugin. type FakePreBindPlugin struct { - Status *fwk.Status + PreBindPreFlightStatus *fwk.Status + PreBindStatus *fwk.Status } // Name returns name of the plugin. @@ -209,25 +210,27 @@ func (pl *FakePreBindPlugin) Name() string { // PreBindPreFlight invoked at the PreBind extension point. func (pl *FakePreBindPlugin) PreBindPreFlight(_ context.Context, _ fwk.CycleState, _ *v1.Pod, _ string) *fwk.Status { - return pl.Status + return pl.PreBindPreFlightStatus } // PreBind invoked at the PreBind extension point. func (pl *FakePreBindPlugin) PreBind(_ context.Context, _ fwk.CycleState, _ *v1.Pod, _ string) *fwk.Status { - return pl.Status + return pl.PreBindStatus } // NewFakePreBindPlugin initializes a fakePreBindPlugin and returns it. -func NewFakePreBindPlugin(status *fwk.Status) frameworkruntime.PluginFactory { +func NewFakePreBindPlugin(preBindPreFlightStatus, preBindStatus *fwk.Status) frameworkruntime.PluginFactory { return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &FakePreBindPlugin{ - Status: status, + PreBindPreFlightStatus: preBindPreFlightStatus, + PreBindStatus: preBindStatus, }, nil } } // FakePermitPlugin is a test permit plugin. type FakePermitPlugin struct { + Handle framework.Handle Status *fwk.Status Timeout time.Duration } @@ -238,16 +241,17 @@ func (pl *FakePermitPlugin) Name() string { } // Permit invoked at the Permit extension point. -func (pl *FakePermitPlugin) Permit(_ context.Context, _ fwk.CycleState, _ *v1.Pod, _ string) (*fwk.Status, time.Duration) { +func (pl *FakePermitPlugin) Permit(_ context.Context, _ fwk.CycleState, p *v1.Pod, _ string) (*fwk.Status, time.Duration) { return pl.Status, pl.Timeout } // NewFakePermitPlugin initializes a fakePermitPlugin and returns it. func NewFakePermitPlugin(status *fwk.Status, timeout time.Duration) frameworkruntime.PluginFactory { - return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) { return &FakePermitPlugin{ Status: status, Timeout: timeout, + Handle: h, }, nil } } diff --git a/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go b/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go index df5e7ac8ee6..49c8ea58bb7 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go +++ b/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go @@ -59,6 +59,12 @@ type CycleState interface { // SetSkipScorePlugins sets plugins that should be skipped in the Score extension point. // This function is mostly for the scheduling framework runtime, plugins usually don't have to use it. SetSkipScorePlugins(plugins sets.Set[string]) + // GetSkipPreBindPlugins returns plugins that will be skipped in the PreBind extension point. + // This function is mostly for the scheduling framework runtime, plugins usually don't have to use it. + GetSkipPreBindPlugins() sets.Set[string] + // SetSkipPreBindPlugins sets plugins that should be skipped in the PerBind extension point. + // This function is mostly for the scheduling framework runtime, plugins usually don't have to use it. + SetSkipPreBindPlugins(plugins sets.Set[string]) // Read retrieves data with the given "key" from CycleState. If the key is not // present, ErrNotFound is returned. // diff --git a/test/integration/scheduler/nominated_node_name/main_test.go b/test/integration/scheduler/nominated_node_name/main_test.go new file mode 100644 index 00000000000..dc7b338611b --- /dev/null +++ b/test/integration/scheduler/nominated_node_name/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nominatednodename + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/scheduler/nominated_node_name/nominated_node_name_test.go b/test/integration/scheduler/nominated_node_name/nominated_node_name_test.go new file mode 100644 index 00000000000..a3517b8d36a --- /dev/null +++ b/test/integration/scheduler/nominated_node_name/nominated_node_name_test.go @@ -0,0 +1,169 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nominatednodename + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler" + "k8s.io/kubernetes/pkg/scheduler/framework" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + schedulerutils "k8s.io/kubernetes/test/integration/scheduler" + testutils "k8s.io/kubernetes/test/integration/util" +) + +type FakePermitPlugin struct { + code fwk.Code +} + +type RunForeverPreBindPlugin struct { + cancel <-chan struct{} +} + +type NoNNNPostBindPlugin struct { + t *testing.T + cancel <-chan struct{} +} + +func (bp *NoNNNPostBindPlugin) Name() string { + return "NoNNNPostBindPlugin" +} + +func (bp *NoNNNPostBindPlugin) PostBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) { + if p.Status.NominatedNodeName != "" { + bp.t.Fatalf("PostBind should not set .status.nominatedNodeName for pod %v/%v, but it was set to %v", p.Namespace, p.Name, p.Status.NominatedNodeName) + } +} + +// Name returns name of the plugin. +func (pp *FakePermitPlugin) Name() string { + return "FakePermitPlugin" +} + +// Permit implements the permit test plugin. +func (pp *FakePermitPlugin) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) { + if pp.code == fwk.Wait { + return fwk.NewStatus(pp.code, ""), 10 * time.Minute + } + return fwk.NewStatus(pp.code, ""), 0 +} + +// Name returns name of the plugin. +func (pp *RunForeverPreBindPlugin) Name() string { + return "RunForeverPreBindPlugin" +} + +// PreBindPreFlight is a test function that returns nil for testing. +func (pp *RunForeverPreBindPlugin) PreBindPreFlight(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status { + return nil +} + +// PreBind is a test function that returns (true, nil) or errors for testing. +func (pp *RunForeverPreBindPlugin) PreBind(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status { + select { + case <-ctx.Done(): + return fwk.NewStatus(fwk.Error, "context cancelled") + case <-pp.cancel: + return fwk.NewStatus(fwk.Error, "pre-bind cancelled") + } +} + +// Test_PutNominatedNodeNameInBindingCycle makes sure that nominatedNodeName is set in the binding cycle +// when the PreBind or Permit plugin (WaitOnPermit) is going to work. +func Test_PutNominatedNodeNameInBindingCycle(t *testing.T) { + cancel := make(chan struct{}) + tests := []struct { + name string + plugin framework.Plugin + expectNominatedNodeName bool + cleanup func() + }{ + { + name: "NominatedNodeName is put if PreBindPlugin will run", + plugin: &RunForeverPreBindPlugin{cancel: cancel}, + expectNominatedNodeName: true, + cleanup: func() { + close(cancel) + }, + }, + { + name: "NominatedNodeName is put if PermitPlugin will run at WaitOnPermit", + expectNominatedNodeName: true, + plugin: &FakePermitPlugin{ + code: fwk.Wait, + }, + }, + { + name: "NominatedNodeName is not put if PermitPlugin won't run at WaitOnPermit", + plugin: &FakePermitPlugin{ + code: fwk.Success, + }, + }, + { + name: "NominatedNodeName is not put if PermitPlugin nor PreBindPlugin will run", + plugin: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testContext := testutils.InitTestAPIServer(t, "nnn-test", nil) + if test.cleanup != nil { + defer test.cleanup() + } + + pf := func(plugin framework.Plugin) frameworkruntime.PluginFactory { + return func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return plugin, nil + } + } + + plugins := []framework.Plugin{&NoNNNPostBindPlugin{cancel: testContext.Ctx.Done(), t: t}} + if test.plugin != nil { + plugins = append(plugins, test.plugin) + } + + registry, prof := schedulerutils.InitRegistryAndConfig(t, pf, plugins...) + + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10, true, + scheduler.WithProfiles(prof), + scheduler.WithFrameworkOutOfTreeRegistry(registry)) + defer teardown() + + pod, err := testutils.CreatePausePod(testCtx.ClientSet, + testutils.InitPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + if err != nil { + t.Fatalf("Error while creating a test pod: %v", err) + } + + if test.expectNominatedNodeName { + if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, testCtx.ClientSet, pod); err != nil { + t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", pod.Namespace, pod.Name, err) + } + } else { + if err := testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, pod); err != nil { + t.Errorf("Pod %v/%v was not scheduled: %v", pod.Namespace, pod.Name, err) + } + } + }) + } +}