feat: trigger PreFilterPreBind in the binding cycle

This commit is contained in:
Kensei Nakada
2025-07-17 00:30:10 -07:00
parent dd4e4f1dd1
commit ac9fad6030
13 changed files with 800 additions and 210 deletions

View File

@@ -132,8 +132,8 @@ func (cq *callQueue) merge(oldAPICall, apiCall *queuedAPICall) error {
return nil
}
// Merge API calls if they are of the same type for the same object.
err := apiCall.Merge(oldAPICall)
// Send a concrete APICall object to allow for a type assertion.
err := apiCall.Merge(oldAPICall.APICall)
if err != nil {
err := fmt.Errorf("failed to merge API calls: %w", err)
apiCall.sendOnFinish(err)

View File

@@ -34,6 +34,8 @@ type CycleState struct {
skipFilterPlugins sets.Set[string]
// skipScorePlugins are plugins that will be skipped in the Score extension point.
skipScorePlugins sets.Set[string]
// skipPreBindPlugins are plugins that will be skipped in the PreBind extension point.
skipPreBindPlugins sets.Set[string]
}
// NewCycleState initializes a new CycleState and returns its pointer.
@@ -73,6 +75,14 @@ func (c *CycleState) GetSkipScorePlugins() sets.Set[string] {
return c.skipScorePlugins
}
func (c *CycleState) SetSkipPreBindPlugins(plugins sets.Set[string]) {
c.skipPreBindPlugins = plugins
}
func (c *CycleState) GetSkipPreBindPlugins() sets.Set[string] {
return c.skipPreBindPlugins
}
// Clone creates a copy of CycleState and returns its pointer. Clone returns
// nil if the context being cloned is nil.
func (c *CycleState) Clone() fwk.CycleState {
@@ -89,6 +99,7 @@ func (c *CycleState) Clone() fwk.CycleState {
copy.recordPluginMetrics = c.recordPluginMetrics
copy.skipFilterPlugins = c.skipFilterPlugins
copy.skipScorePlugins = c.skipScorePlugins
copy.skipPreBindPlugins = c.skipPreBindPlugins
return copy
}

View File

@@ -440,12 +440,12 @@ type ReservePlugin interface {
// These plugins are called before a pod being scheduled.
type PreBindPlugin interface {
Plugin
// PreBindPreFlight is called before PreBind, and the plugin is supposed to return Success, Skip, or Error status.
// PreBindPreFlight is called before PreBind, and the plugin is supposed to return Success, Skip, or Error status
// to tell the scheduler whether the plugin will do something in PreBind or not.
// If it returns Success, it means this PreBind plugin will handle this pod.
// If it returns Skip, it means this PreBind plugin has nothing to do with the pod, and PreBind will be skipped.
// This function should be lightweight, and shouldn't do any actual operation, e.g., creating a volume etc.
PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status
// PreBind is called before binding a pod. All prebind plugins must return
// success or the pod will be rejected and won't be sent for binding.
PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status
@@ -526,6 +526,10 @@ type Framework interface {
// internal error. In either case the pod is not going to be bound.
RunPreBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status
// RunPreBindPreFlights runs the set of configured PreBindPreFlight functions from PreBind plugins.
// It returns immediately if any of the plugins returns a non-skip status.
RunPreBindPreFlights(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status
// RunPostBindPlugins runs the set of configured PostBind plugins.
RunPostBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string)
@@ -547,6 +551,9 @@ type Framework interface {
// Pod will remain waiting pod for the minimum duration returned by the Permit plugins.
RunPermitPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status
// WillWaitOnPermit returns whether this pod will wait on permit by checking if the pod is a waiting pod.
WillWaitOnPermit(ctx context.Context, pod *v1.Pod) bool
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status

View File

@@ -1278,6 +1278,10 @@ func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state fwk.CycleSt
logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
}
for _, pl := range f.preBindPlugins {
if state.GetSkipPreBindPlugins().Has(pl.Name()) {
continue
}
ctx := ctx
if verboseLogs {
logger := klog.LoggerWithName(logger, pl.Name())
@@ -1308,6 +1312,60 @@ func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBi
return status
}
// RunPreBindPreFlights runs the set of configured PreBindPreFlight functions from PreBind plugins.
// The returning value is:
// - Success: one or more plugins return success, meaning, some PreBind plugins will work for this pod.
// - Skip: all plugins return skip.
// - Error: any plugin return error.
func (f *frameworkImpl) RunPreBindPreFlights(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (status *fwk.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBindPreFlight, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
logger := klog.FromContext(ctx)
verboseLogs := logger.V(4).Enabled()
if verboseLogs {
logger = klog.LoggerWithName(logger, "PreBindPreFlight")
logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
}
skipPlugins := sets.New[string]()
returningStatus := fwk.NewStatus(fwk.Skip)
for _, pl := range f.preBindPlugins {
ctx := ctx
if verboseLogs {
logger := klog.LoggerWithName(logger, pl.Name())
ctx = klog.NewContext(ctx, logger)
}
status = f.runPreBindPreFlight(ctx, pl, state, pod, nodeName)
switch {
case status.Code() == fwk.Error:
err := status.AsError()
logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
return fwk.AsStatus(fmt.Errorf("running PreBindPreFlight %q: %w", pl.Name(), err))
case status.IsSuccess():
// We return success when one or more plugins return success.
returningStatus = nil
case status.IsSkip():
skipPlugins.Insert(pl.Name())
default:
// Other statuses are unexpected
return fwk.AsStatus(fmt.Errorf("PreBindPreFlight %s returned %q, which is unsupported. It is supposed to return Success, Skip, or Error status", pl.Name(), status.Code()))
}
}
state.SetSkipPreBindPlugins(skipPlugins)
return returningStatus
}
func (f *frameworkImpl) runPreBindPreFlight(ctx context.Context, pl framework.PreBindPlugin, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
if !state.ShouldRecordPluginMetrics() {
return pl.PreBindPreFlight(ctx, state, pod, nodeName)
}
startTime := time.Now()
status := pl.PreBindPreFlight(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(metrics.PreBindPreFlight, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (status *fwk.Status) {
startTime := time.Now()
@@ -1536,6 +1594,10 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.Permit
return status, timeout
}
func (f *frameworkImpl) WillWaitOnPermit(ctx context.Context, pod *v1.Pod) bool {
return f.waitingPods.get(pod.UID) != nil
}
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status {
waitingPod := f.waitingPods.get(pod.UID)

View File

@@ -235,12 +235,12 @@ func (pl *TestPlugin) Reserve(ctx context.Context, state fwk.CycleState, p *v1.P
func (pl *TestPlugin) Unreserve(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) {
}
func (pl *TestPlugin) PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
return fwk.NewStatus(fwk.Code(pl.inj.PreBindStatus), injectReason)
func (pl *TestPlugin) PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
return fwk.NewStatus(fwk.Code(pl.inj.PreBindPreFlightStatus), injectReason)
}
func (pl *TestPlugin) PreBindPreFlight(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
return nil
func (pl *TestPlugin) PreBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
return fwk.NewStatus(fwk.Code(pl.inj.PreBindStatus), injectReason)
}
func (pl *TestPlugin) PostBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) {
@@ -2613,6 +2613,110 @@ func TestPreBindPlugins(t *testing.T) {
}
}
func TestPreBindPreFlightPlugins(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
wantStatus *fwk.Status
}{
{
name: "Skip when there's no PreBind Plugin",
plugins: []*TestPlugin{},
wantStatus: fwk.NewStatus(fwk.Skip),
},
{
name: "Success when PreBindPreFlight returns Success",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)},
},
{
name: "TestPlugin2",
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Success)},
},
},
wantStatus: nil,
},
{
name: "Skip when all PreBindPreFlight returns Skip",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)},
},
{
name: "TestPlugin2",
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)},
},
},
wantStatus: fwk.NewStatus(fwk.Skip),
},
{
name: "Error when PreBindPreFlight returns Error",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Skip)},
},
{
name: "TestPlugin2",
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Error)},
},
},
wantStatus: fwk.AsStatus(fmt.Errorf(`running PreBindPreFlight "TestPlugin2": %w`, errInjectedStatus)),
},
{
name: "Error when PreBindPreFlight returns Unschedulable",
plugins: []*TestPlugin{
{
name: "TestPlugin",
inj: injectedResult{PreBindPreFlightStatus: int(fwk.Unschedulable)},
},
},
wantStatus: fwk.NewStatus(fwk.Error, "PreBindPreFlight TestPlugin returned \"Unschedulable\", which is unsupported. It is supposed to return Success, Skip, or Error status"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
registry := Registry{}
configPlugins := &config.Plugins{}
for _, pl := range tt.plugins {
tmpPl := pl
if err := registry.Register(pl.name, func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return tmpPl, nil
}); err != nil {
t.Fatalf("Unable to register pre bind plugins: %s", pl.name)
}
configPlugins.PreBind.Enabled = append(
configPlugins.PreBind.Enabled,
config.Plugin{Name: pl.name},
)
}
profile := config.KubeSchedulerProfile{Plugins: configPlugins}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
defer func() {
_ = f.Close()
}()
status := f.RunPreBindPreFlights(ctx, state, pod, "")
if diff := cmp.Diff(tt.wantStatus, status, statusCmpOpts...); diff != "" {
t.Errorf("Wrong status code (-want,+got):\n%s", diff)
}
})
}
}
func TestReservePlugins(t *testing.T) {
tests := []struct {
name string
@@ -3458,6 +3562,7 @@ type injectedResult struct {
PostFilterStatus int `json:"postFilterStatus,omitempty"`
PreScoreStatus int `json:"preScoreStatus,omitempty"`
ReserveStatus int `json:"reserveStatus,omitempty"`
PreBindPreFlightStatus int `json:"preBindPreFlightStatus,omitempty"`
PreBindStatus int `json:"preBindStatus,omitempty"`
BindStatus int `json:"bindStatus,omitempty"`
PermitStatus int `json:"permitStatus,omitempty"`

View File

@@ -73,6 +73,7 @@ const (
Score = "Score"
ScoreExtensionNormalize = "ScoreExtensionNormalize"
PreBind = "PreBind"
PreBindPreFlight = "PreBindPreFlight"
Bind = "Bind"
PostBind = "PostBind"
Reserve = "Reserve"

View File

@@ -279,6 +279,26 @@ func (sched *Scheduler) bindingCycle(
assumedPod := assumedPodInfo.Pod
if sched.nominatedNodeNameForExpectationEnabled {
preFlightStatus := schedFramework.RunPreBindPreFlights(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if preFlightStatus.Code() == fwk.Error ||
// Unschedulable status is not supported in PreBindPreFlight and hence we regard it as an error.
preFlightStatus.IsRejected() {
return preFlightStatus
}
if preFlightStatus.IsSuccess() || schedFramework.WillWaitOnPermit(ctx, assumedPod) {
// Add NominatedNodeName to tell the external components (e.g., the cluster autoscaler) that the pod is about to be bound to the node.
// We only do this when any of WaitOnPermit or PreBind will work because otherwise the pod will be soon bound anyway.
if err := updatePod(ctx, sched.client, schedFramework.APICacher(), assumedPod, nil, &framework.NominatingInfo{
NominatedNodeName: scheduleResult.SuggestedHost,
NominatingMode: framework.ModeOverride,
}); err != nil {
logger.Error(err, "Failed to update the nominated node name in the binding cycle", "pod", klog.KObj(assumedPod), "nominatedNodeName", scheduleResult.SuggestedHost)
// We continue the processing because it's not critical enough to stop binding cycles here.
}
}
}
// Run "permit" plugins.
if status := schedFramework.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
if status.IsRejected() {
@@ -1118,12 +1138,21 @@ func updatePod(ctx context.Context, client clientset.Interface, apiCacher framew
return err
}
logger := klog.FromContext(ctx)
logger.V(3).Info("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
logValues := []any{"pod", klog.KObj(pod)}
if condition != nil {
logValues = append(logValues, "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
}
if nominatingInfo != nil {
logValues = append(logValues, "nominatedNodeName", nominatingInfo.NominatedNodeName, "nominatingMode", nominatingInfo.Mode())
}
logger.V(3).Info("Updating pod condition and nominated node name", logValues...)
podStatusCopy := pod.Status.DeepCopy()
// NominatedNodeName is updated only if we are trying to set it, and the value is
// different from the existing one.
nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName
if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate {
podConditionNeedsUpdate := condition != nil && podutil.UpdatePodCondition(podStatusCopy, condition)
if !podConditionNeedsUpdate && !nnnNeedsUpdate {
return nil
}
if nnnNeedsUpdate {

View File

@@ -18,6 +18,7 @@ package scheduler
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
@@ -39,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -57,13 +59,13 @@ import (
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
"k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
@@ -680,17 +682,24 @@ func TestSchedulerScheduleOne(t *testing.T) {
schedulingErr := errors.New("scheduler")
permitErr := errors.New("permit error")
preBindErr := errors.New("on PreBind")
preBindPreFlightErr := errors.New("on PreBindPreFlight")
bindingErr := errors.New("binder")
testPod := podWithID("foo", "")
assignedTestPod := podWithID("foo", testNode.Name)
type podToAdmit struct {
pluginName string
pod types.UID
}
table := []struct {
name string
sendPod *v1.Pod
registerPluginFuncs []tf.RegisterPluginFunc
injectBindError error
injectSchedulingError error
podToAdmit *podToAdmit
mockScheduleResult ScheduleResult
expectErrorPod *v1.Pod
expectForgetPod *v1.Pod
@@ -698,8 +707,13 @@ func TestSchedulerScheduleOne(t *testing.T) {
expectPodInBackoffQ *v1.Pod
expectPodInUnschedulable *v1.Pod
expectError error
expectNominatedNodeName string
expectBind *v1.Binding
eventReason string
// If nil, the test case is run with both enabled and disabled
nominatedNodeNameForExpectationEnabled *bool
// If nil, the test case is run with both enabled and disabled
asyncAPICallsEnabled *bool
}{
{
name: "schedule pod failed",
@@ -768,32 +782,112 @@ func TestSchedulerScheduleOne(t *testing.T) {
eventReason: "FailedScheduling",
},
{
name: "prebind failed with status code rejected",
name: "nominated node name is not set, permit plugin is working, but the feature gate NominatedNodeNameForExpectation is disabled",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Unschedulable, "rejected on prebind"))),
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Wait, "rejected on permit"), time.Minute)),
},
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf("rejected on prebind"),
eventReason: "FailedScheduling",
podToAdmit: &podToAdmit{pluginName: "FakePermit", pod: testPod.UID},
mockScheduleResult: scheduleResultOk,
nominatedNodeNameForExpectationEnabled: ptr.To(false),
expectAssumedPod: assignedTestPod,
expectBind: fakeBinding,
eventReason: "Scheduled",
},
{
name: "nominated node name is set, permit plugin is working in wait on permit phase",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Wait, "rejected on permit"), time.Minute)),
},
podToAdmit: &podToAdmit{pluginName: "FakePermit", pod: testPod.UID},
mockScheduleResult: scheduleResultOk,
expectNominatedNodeName: testNode.Name,
// Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding.
// So, it's safe to run this test with asyncAPICallsEnabled = false only.
asyncAPICallsEnabled: ptr.To(false),
nominatedNodeNameForExpectationEnabled: ptr.To(true),
expectAssumedPod: assignedTestPod,
expectBind: fakeBinding,
eventReason: "Scheduled",
},
{
name: "prebindpreflight failed with status code error",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.AsStatus(preBindPreFlightErr), nil)),
},
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
nominatedNodeNameForExpectationEnabled: ptr.To(true),
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf(`running PreBindPreFlight "FakePreBind": %w`, preBindPreFlightErr),
eventReason: "FailedScheduling",
},
{
name: "prebindpreflight failed with status code unschedulable",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Unschedulable, "rejected on prebindpreflight"), nil)),
},
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
nominatedNodeNameForExpectationEnabled: ptr.To(true),
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf("PreBindPreFlight FakePreBind returned \"Unschedulable\", which is unsupported. It is supposed to return Success, Skip, or Error status"),
eventReason: "FailedScheduling",
},
{
name: "prebind isn't called and nominated node name isn't set, if preflight returns skip",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
// Configure it to return error on prebind to make sure it's not called.
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Skip), fwk.NewStatus(fwk.Error, "rejected on prebind"))),
},
mockScheduleResult: scheduleResultOk,
expectAssumedPod: assignedTestPod,
nominatedNodeNameForExpectationEnabled: ptr.To(true),
expectBind: fakeBinding,
eventReason: "Scheduled",
},
{
name: "prebind is called and nominated node name is set, if preflight returns success",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, nil)),
},
mockScheduleResult: scheduleResultOk,
nominatedNodeNameForExpectationEnabled: ptr.To(true),
// Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding.
// So, it's safe to run this test with asyncAPICallsEnabled = false only.
asyncAPICallsEnabled: ptr.To(false),
expectNominatedNodeName: testNode.Name,
expectAssumedPod: assignedTestPod,
expectBind: fakeBinding,
eventReason: "Scheduled",
},
{
name: "prebind failed with status code error",
sendPod: testPod,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.AsStatus(preBindErr))),
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, fwk.AsStatus(preBindErr))),
},
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr),
eventReason: "FailedScheduling",
mockScheduleResult: scheduleResultOk,
expectErrorPod: assignedTestPod,
expectForgetPod: assignedTestPod,
nominatedNodeNameForExpectationEnabled: ptr.To(true),
// Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding.
// So, it's safe to run this test with asyncAPICallsEnabled = false only.
asyncAPICallsEnabled: ptr.To(false),
expectNominatedNodeName: testNode.Name,
expectAssumedPod: assignedTestPod,
expectPodInBackoffQ: testPod,
expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr),
eventReason: "FailedScheduling",
},
{
name: "binding failed",
@@ -825,158 +919,221 @@ func TestSchedulerScheduleOne(t *testing.T) {
}
for _, qHintEnabled := range []bool{true, false} {
for _, asyncAPICallsEnabled := range []bool{true, false} {
for _, item := range table {
t.Run(fmt.Sprintf("%s (Queueing hints enabled: %v, Async API calls enabled: %v)", item.name, qHintEnabled, asyncAPICallsEnabled), func(t *testing.T) {
if !qHintEnabled {
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33"))
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, false)
for _, item := range table {
asyncAPICallsEnabled := []bool{true, false}
if item.asyncAPICallsEnabled != nil {
asyncAPICallsEnabled = []bool{*item.asyncAPICallsEnabled}
}
for _, asyncAPICallsEnabled := range asyncAPICallsEnabled {
nominatedNodeNameForExpectationEnabled := []bool{true, false}
if item.nominatedNodeNameForExpectationEnabled != nil {
nominatedNodeNameForExpectationEnabled = []bool{*item.nominatedNodeNameForExpectationEnabled}
}
for _, nominatedNodeNameForExpectationEnabled := range nominatedNodeNameForExpectationEnabled {
if nominatedNodeNameForExpectationEnabled && !qHintEnabled {
// If the QHint feature gate is disabled, NominatedNodeNameForExpectation cannot be enabled
// because that means users set the emilation version to 1.33 or later.
continue
}
logger, ctx := ktesting.NewTestContext(t)
var gotError error
var gotPod *v1.Pod
var gotForgetPod *v1.Pod
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
client := clientsetfake.NewClientset(item.sendPod)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() != "binding" {
return false, nil, nil
t.Run(fmt.Sprintf("%s (Queueing hints enabled: %v, Async API calls enabled: %v, NominatedNodeNameForExpectation enabled: %v)", item.name, qHintEnabled, asyncAPICallsEnabled, nominatedNodeNameForExpectationEnabled), func(t *testing.T) {
if !qHintEnabled {
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33"))
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, false)
}
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
return true, gotBinding, item.injectBindError
})
var apiDispatcher *apidispatcher.APIDispatcher
if asyncAPICallsEnabled {
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
apiDispatcher.Run(logger)
defer apiDispatcher.Close()
}
internalCache := internalcache.New(ctx, 30*time.Second, apiDispatcher)
cache := &fakecache.Cache{
Cache: internalCache,
ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod
},
AssumeFunc: func(pod *v1.Pod) {
gotAssumedPod = pod
},
IsAssumedPodFunc: func(pod *v1.Pod) bool {
if pod == nil || gotAssumedPod == nil {
return false
logger, ctx := ktesting.NewTestContext(t)
var gotError error
var gotPod *v1.Pod
var gotForgetPod *v1.Pod
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
client := clientsetfake.NewClientset(item.sendPod)
informerFactory := informers.NewSharedInformerFactory(client, 0)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() != "binding" {
return false, nil, nil
}
return pod.UID == gotAssumedPod.UID
},
}
informerFactory := informers.NewSharedInformerFactory(client, 0)
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
return true, gotBinding, item.injectBindError
})
schedFramework, err := tf.NewFramework(ctx,
append(item.registerPluginFuncs,
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
),
testSchedulerName,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithAPIDispatcher(apiDispatcher),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithInformerFactory(informerFactory),
)
if err != nil {
t.Fatal(err)
}
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar), internalqueue.WithAPIDispatcher(apiDispatcher))
if asyncAPICallsEnabled {
schedFramework.SetAPICacher(apicache.New(queue, cache))
}
sched := &Scheduler{
Cache: cache,
client: client,
NextPod: queue.Pop,
SchedulingQueue: queue,
Profiles: profile.Map{testSchedulerName: schedFramework},
APIDispatcher: apiDispatcher,
}
queue.Add(logger, item.sendPod)
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockScheduleResult, item.injectSchedulingError
}
sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *framework.NominatingInfo, start time.Time) {
gotPod = p.Pod
gotError = status.AsError()
sched.handleSchedulingFailure(ctx, fwk, p, status, ni, start)
}
called := make(chan struct{})
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*eventsv1.Event)
if e.Reason != item.eventReason {
t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
var apiDispatcher *apidispatcher.APIDispatcher
if asyncAPICallsEnabled {
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
apiDispatcher.Run(logger)
defer apiDispatcher.Close()
}
close(called)
internalCache := internalcache.New(ctx, 30*time.Second, apiDispatcher)
cache := &fakecache.Cache{
Cache: internalCache,
ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod
},
AssumeFunc: func(pod *v1.Pod) {
gotAssumedPod = pod
},
IsAssumedPodFunc: func(pod *v1.Pod) bool {
if pod == nil || gotAssumedPod == nil {
return false
}
return pod.UID == gotAssumedPod.UID
},
}
mu := &sync.Mutex{}
updatedNominatedNodeName := ""
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}
patchAction := action.(clienttesting.PatchAction)
podName := patchAction.GetName()
namespace := patchAction.GetNamespace()
patch := patchAction.GetPatch()
pod, err := informerFactory.Core().V1().Pods().Lister().Pods(namespace).Get(podName)
if err != nil {
t.Fatalf("Failed to get the original pod %s/%s before patching: %v\n", namespace, podName, err)
}
marshalledPod, err := json.Marshal(pod)
if err != nil {
t.Fatalf("Failed to marshal the original pod %s/%s: %v", namespace, podName, err)
}
updated, err := strategicpatch.StrategicMergePatch(marshalledPod, patch, v1.Pod{})
if err != nil {
t.Fatalf("Failed to apply strategic merge patch %q on pod %#v: %v", patch, marshalledPod, err)
}
updatedPod := &v1.Pod{}
if err := json.Unmarshal(updated, updatedPod); err != nil {
t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err)
}
mu.Lock()
updatedNominatedNodeName = updatedPod.Status.NominatedNodeName
mu.Unlock()
return true, nil, nil
})
schedFramework, err := tf.NewFramework(ctx,
append(item.registerPluginFuncs,
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
),
testSchedulerName,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithAPIDispatcher(apiDispatcher),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithInformerFactory(informerFactory),
)
if err != nil {
t.Fatal(err)
}
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(*ar), internalqueue.WithAPIDispatcher(apiDispatcher))
if asyncAPICallsEnabled {
schedFramework.SetAPICacher(apicache.New(queue, cache))
}
sched := &Scheduler{
Cache: cache,
client: client,
NextPod: queue.Pop,
SchedulingQueue: queue,
Profiles: profile.Map{testSchedulerName: schedFramework},
APIDispatcher: apiDispatcher,
nominatedNodeNameForExpectationEnabled: nominatedNodeNameForExpectationEnabled,
}
queue.Add(logger, item.sendPod)
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) {
return item.mockScheduleResult, item.injectSchedulingError
}
sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *framework.NominatingInfo, start time.Time) {
gotPod = p.Pod
gotError = status.AsError()
sched.handleSchedulingFailure(ctx, fwk, p, status, ni, start)
}
called := make(chan struct{})
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*eventsv1.Event)
if e.Reason != item.eventReason {
t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
}
close(called)
})
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
sched.ScheduleOne(ctx)
if item.podToAdmit != nil {
for {
if waitingPod := sched.Profiles[testSchedulerName].GetWaitingPod(item.podToAdmit.pod); waitingPod != nil {
waitingPod.Allow(item.podToAdmit.pluginName)
break
}
}
}
<-called
mu.Lock()
if item.expectNominatedNodeName != updatedNominatedNodeName {
t.Errorf("Expected nominated node name %q, got %q", item.expectNominatedNodeName, updatedNominatedNodeName)
}
mu.Unlock()
if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" {
t.Errorf("Unexpected assumed pod (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(item.expectErrorPod, gotPod); diff != "" {
t.Errorf("Unexpected error pod (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(item.expectForgetPod, gotForgetPod); diff != "" {
t.Errorf("Unexpected forget pod (-want,+got):\n%s", diff)
}
if item.expectError == nil || gotError == nil {
if !errors.Is(gotError, item.expectError) {
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError, gotError)
}
} else if item.expectError.Error() != gotError.Error() {
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError.Error(), gotError.Error())
}
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
t.Errorf("Unexpected binding (-want,+got):\n%s", diff)
}
// We have to use wait here because the Pod goes to the binding cycle in some test cases
// and the inflight pods might not be empty immediately at this point in such case.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
return len(queue.InFlightPods()) == 0, nil
}); err != nil {
t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods()))
}
podsInBackoffQ := queue.PodsInBackoffQ()
if item.expectPodInBackoffQ != nil {
if !podListContainsPod(podsInBackoffQ, item.expectPodInBackoffQ) {
t.Errorf("Expected to find pod in backoffQ, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInBackoffQ, podsInBackoffQ)
}
} else {
if len(podsInBackoffQ) > 0 {
t.Errorf("Expected backoffQ to be empty, but it's not.\nGot: %v", podsInBackoffQ)
}
}
unschedulablePods := queue.UnschedulablePods()
if item.expectPodInUnschedulable != nil {
if !podListContainsPod(unschedulablePods, item.expectPodInUnschedulable) {
t.Errorf("Expected to find pod in unschedulable, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInUnschedulable, unschedulablePods)
}
} else {
if len(unschedulablePods) > 0 {
t.Errorf("Expected unschedulable pods to be empty, but it's not.\nGot: %v", unschedulablePods)
}
}
stopFunc()
})
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
sched.ScheduleOne(ctx)
<-called
if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" {
t.Errorf("Unexpected assumed pod (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(item.expectErrorPod, gotPod); diff != "" {
t.Errorf("Unexpected error pod (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(item.expectForgetPod, gotForgetPod); diff != "" {
t.Errorf("Unexpected forget pod (-want,+got):\n%s", diff)
}
if item.expectError == nil || gotError == nil {
if !errors.Is(gotError, item.expectError) {
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError, gotError)
}
} else if item.expectError.Error() != gotError.Error() {
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError.Error(), gotError.Error())
}
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
t.Errorf("Unexpected binding (-want,+got):\n%s", diff)
}
// We have to use wait here because the Pod goes to the binding cycle in some test cases
// and the inflight pods might not be empty immediately at this point in such case.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
return len(queue.InFlightPods()) == 0, nil
}); err != nil {
t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods()))
}
podsInBackoffQ := queue.PodsInBackoffQ()
if item.expectPodInBackoffQ != nil {
if !podListContainsPod(podsInBackoffQ, item.expectPodInBackoffQ) {
t.Errorf("Expected to find pod in backoffQ, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInBackoffQ, podsInBackoffQ)
}
} else {
if len(podsInBackoffQ) > 0 {
t.Errorf("Expected backoffQ to be empty, but it's not.\nGot: %v", podsInBackoffQ)
}
}
unschedulablePods := queue.UnschedulablePods()
if item.expectPodInUnschedulable != nil {
if !podListContainsPod(unschedulablePods, item.expectPodInUnschedulable) {
t.Errorf("Expected to find pod in unschedulable, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInUnschedulable, unschedulablePods)
}
} else {
if len(unschedulablePods) > 0 {
t.Errorf("Expected unschedulable pods to be empty, but it's not.\nGot: %v", unschedulablePods)
}
}
stopFunc()
})
}
}
}
}
@@ -1079,7 +1236,7 @@ func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) {
sendPod: testPod,
mockScheduleResult: scheduleResultOk,
registerPluginFuncs: []tf.RegisterPluginFunc{
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Unschedulable))),
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, fwk.NewStatus(fwk.Unschedulable))),
},
mockWaitOnPermitResult: fwk.NewStatus(fwk.Success),
mockRunPreBindPluginsResult: fwk.NewStatus(fwk.Unschedulable, preBindErr.Error()),
@@ -1615,9 +1772,6 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
findErr := fmt.Errorf("find err")
assumeErr := fmt.Errorf("assume err")
bindErr := fmt.Errorf("bind err")
client := clientsetfake.NewClientset()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
// This can be small because we wait for pod to finish scheduling first
chanTimeout := 2 * time.Second
@@ -1709,7 +1863,9 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fakeVolumeBinder := volumebinding.NewFakeVolumeBinder(item.volumeBinderConfig)
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, t, fakeVolumeBinder, eventBroadcaster, asyncAPICallsEnabled)
client := clientsetfake.NewClientset()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, t, client, fakeVolumeBinder, eventBroadcaster, asyncAPICallsEnabled)
eventChan := make(chan struct{})
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*eventsv1.Event)
@@ -4361,7 +4517,10 @@ func setupTestScheduler(ctx context.Context, t *testing.T, client clientset.Inte
return sched, errChan
}
func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, volumeBinder volumebinding.SchedulerVolumeBinder, broadcaster events.EventBroadcaster, asyncAPICallsEnabled bool) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, client *clientsetfake.Clientset, volumeBinder volumebinding.SchedulerVolumeBinder, broadcaster events.EventBroadcaster, asyncAPICallsEnabled bool) (*Scheduler, chan *v1.Binding, chan error) {
if client == nil {
client = clientsetfake.NewClientset()
}
logger := klog.FromContext(ctx)
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
@@ -4374,8 +4533,6 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, volu
}
testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
client := clientsetfake.NewClientset(&testNode, &testPVC)
bindingChan := interruptOnBind(client)
var apiDispatcher *apidispatcher.APIDispatcher
if asyncAPICallsEnabled {
@@ -4389,7 +4546,16 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, volu
informerFactory := informers.NewSharedInformerFactory(client, 0)
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
pvcInformer.Informer().GetStore().Add(&testPVC)
if _, err := client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create pod: %v", err)
}
if _, err := client.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, &testPVC, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create PVC: %v", err)
}
if _, err := client.CoreV1().Nodes().Create(ctx, &testNode, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create node: %v", err)
}
bindingChan := interruptOnBind(client)
fns := []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),

View File

@@ -26,7 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
@@ -40,13 +40,13 @@ import (
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
"k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/backend/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources"
@@ -120,6 +120,8 @@ type Scheduler struct {
// registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start.
registeredHandlers []cache.ResourceEventHandlerRegistration
nominatedNodeNameForExpectationEnabled bool
}
func (sched *Scheduler) applyDefaultHandlers() {
@@ -320,11 +322,11 @@ func New(ctx context.Context,
var resourceClaimCache *assumecache.AssumeCache
var resourceSliceTracker *resourceslicetracker.Tracker
var draManager framework.SharedDRAManager
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
resourceSliceTrackerOpts := resourceslicetracker.Options{
EnableDeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
EnableDeviceTaints: feature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
KubeClient: client,
}
@@ -341,7 +343,7 @@ func New(ctx context.Context,
draManager = dynamicresources.NewDRAManager(ctx, resourceClaimCache, resourceSliceTracker, informerFactory)
}
var apiDispatcher *apidispatcher.APIDispatcher
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerAsyncAPICalls) {
if feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncAPICalls) {
apiDispatcher = apidispatcher.New(client, int(options.parallelism), apicalls.Relevances)
}
@@ -420,16 +422,17 @@ func New(ctx context.Context,
debugger.ListenForSignal(ctx)
sched := &Scheduler{
Cache: schedulerCache,
client: client,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders,
StopEverything: stopEverything,
SchedulingQueue: podQueue,
Profiles: profiles,
logger: logger,
APIDispatcher: apiDispatcher,
Cache: schedulerCache,
client: client,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders,
StopEverything: stopEverything,
SchedulingQueue: podQueue,
Profiles: profiles,
logger: logger,
APIDispatcher: apiDispatcher,
nominatedNodeNameForExpectationEnabled: feature.DefaultFeatureGate.Enabled(features.NominatedNodeNameForExpectation),
}
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()
@@ -472,7 +475,7 @@ func buildQueueingHintMap(ctx context.Context, es []framework.EnqueueExtensions)
registerNodeTaintUpdated := false
for _, event := range events {
fn := event.QueueingHintFn
if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
if fn == nil || !feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
fn = defaultQueueingHintFn
}

View File

@@ -199,7 +199,8 @@ func NewFakeReservePlugin(status *fwk.Status) frameworkruntime.PluginFactory {
// FakePreBindPlugin is a test prebind plugin.
type FakePreBindPlugin struct {
Status *fwk.Status
PreBindPreFlightStatus *fwk.Status
PreBindStatus *fwk.Status
}
// Name returns name of the plugin.
@@ -209,25 +210,27 @@ func (pl *FakePreBindPlugin) Name() string {
// PreBindPreFlight invoked at the PreBind extension point.
func (pl *FakePreBindPlugin) PreBindPreFlight(_ context.Context, _ fwk.CycleState, _ *v1.Pod, _ string) *fwk.Status {
return pl.Status
return pl.PreBindPreFlightStatus
}
// PreBind invoked at the PreBind extension point.
func (pl *FakePreBindPlugin) PreBind(_ context.Context, _ fwk.CycleState, _ *v1.Pod, _ string) *fwk.Status {
return pl.Status
return pl.PreBindStatus
}
// NewFakePreBindPlugin initializes a fakePreBindPlugin and returns it.
func NewFakePreBindPlugin(status *fwk.Status) frameworkruntime.PluginFactory {
func NewFakePreBindPlugin(preBindPreFlightStatus, preBindStatus *fwk.Status) frameworkruntime.PluginFactory {
return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &FakePreBindPlugin{
Status: status,
PreBindPreFlightStatus: preBindPreFlightStatus,
PreBindStatus: preBindStatus,
}, nil
}
}
// FakePermitPlugin is a test permit plugin.
type FakePermitPlugin struct {
Handle framework.Handle
Status *fwk.Status
Timeout time.Duration
}
@@ -238,16 +241,17 @@ func (pl *FakePermitPlugin) Name() string {
}
// Permit invoked at the Permit extension point.
func (pl *FakePermitPlugin) Permit(_ context.Context, _ fwk.CycleState, _ *v1.Pod, _ string) (*fwk.Status, time.Duration) {
func (pl *FakePermitPlugin) Permit(_ context.Context, _ fwk.CycleState, p *v1.Pod, _ string) (*fwk.Status, time.Duration) {
return pl.Status, pl.Timeout
}
// NewFakePermitPlugin initializes a fakePermitPlugin and returns it.
func NewFakePermitPlugin(status *fwk.Status, timeout time.Duration) frameworkruntime.PluginFactory {
return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return func(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) {
return &FakePermitPlugin{
Status: status,
Timeout: timeout,
Handle: h,
}, nil
}
}

View File

@@ -59,6 +59,12 @@ type CycleState interface {
// SetSkipScorePlugins sets plugins that should be skipped in the Score extension point.
// This function is mostly for the scheduling framework runtime, plugins usually don't have to use it.
SetSkipScorePlugins(plugins sets.Set[string])
// GetSkipPreBindPlugins returns plugins that will be skipped in the PreBind extension point.
// This function is mostly for the scheduling framework runtime, plugins usually don't have to use it.
GetSkipPreBindPlugins() sets.Set[string]
// SetSkipPreBindPlugins sets plugins that should be skipped in the PerBind extension point.
// This function is mostly for the scheduling framework runtime, plugins usually don't have to use it.
SetSkipPreBindPlugins(plugins sets.Set[string])
// Read retrieves data with the given "key" from CycleState. If the key is not
// present, ErrNotFound is returned.
//

View File

@@ -0,0 +1,27 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nominatednodename
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@@ -0,0 +1,169 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nominatednodename
import (
"context"
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
schedulerutils "k8s.io/kubernetes/test/integration/scheduler"
testutils "k8s.io/kubernetes/test/integration/util"
)
type FakePermitPlugin struct {
code fwk.Code
}
type RunForeverPreBindPlugin struct {
cancel <-chan struct{}
}
type NoNNNPostBindPlugin struct {
t *testing.T
cancel <-chan struct{}
}
func (bp *NoNNNPostBindPlugin) Name() string {
return "NoNNNPostBindPlugin"
}
func (bp *NoNNNPostBindPlugin) PostBind(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) {
if p.Status.NominatedNodeName != "" {
bp.t.Fatalf("PostBind should not set .status.nominatedNodeName for pod %v/%v, but it was set to %v", p.Namespace, p.Name, p.Status.NominatedNodeName)
}
}
// Name returns name of the plugin.
func (pp *FakePermitPlugin) Name() string {
return "FakePermitPlugin"
}
// Permit implements the permit test plugin.
func (pp *FakePermitPlugin) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
if pp.code == fwk.Wait {
return fwk.NewStatus(pp.code, ""), 10 * time.Minute
}
return fwk.NewStatus(pp.code, ""), 0
}
// Name returns name of the plugin.
func (pp *RunForeverPreBindPlugin) Name() string {
return "RunForeverPreBindPlugin"
}
// PreBindPreFlight is a test function that returns nil for testing.
func (pp *RunForeverPreBindPlugin) PreBindPreFlight(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
return nil
}
// PreBind is a test function that returns (true, nil) or errors for testing.
func (pp *RunForeverPreBindPlugin) PreBind(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
select {
case <-ctx.Done():
return fwk.NewStatus(fwk.Error, "context cancelled")
case <-pp.cancel:
return fwk.NewStatus(fwk.Error, "pre-bind cancelled")
}
}
// Test_PutNominatedNodeNameInBindingCycle makes sure that nominatedNodeName is set in the binding cycle
// when the PreBind or Permit plugin (WaitOnPermit) is going to work.
func Test_PutNominatedNodeNameInBindingCycle(t *testing.T) {
cancel := make(chan struct{})
tests := []struct {
name string
plugin framework.Plugin
expectNominatedNodeName bool
cleanup func()
}{
{
name: "NominatedNodeName is put if PreBindPlugin will run",
plugin: &RunForeverPreBindPlugin{cancel: cancel},
expectNominatedNodeName: true,
cleanup: func() {
close(cancel)
},
},
{
name: "NominatedNodeName is put if PermitPlugin will run at WaitOnPermit",
expectNominatedNodeName: true,
plugin: &FakePermitPlugin{
code: fwk.Wait,
},
},
{
name: "NominatedNodeName is not put if PermitPlugin won't run at WaitOnPermit",
plugin: &FakePermitPlugin{
code: fwk.Success,
},
},
{
name: "NominatedNodeName is not put if PermitPlugin nor PreBindPlugin will run",
plugin: nil,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testContext := testutils.InitTestAPIServer(t, "nnn-test", nil)
if test.cleanup != nil {
defer test.cleanup()
}
pf := func(plugin framework.Plugin) frameworkruntime.PluginFactory {
return func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return plugin, nil
}
}
plugins := []framework.Plugin{&NoNNNPostBindPlugin{cancel: testContext.Ctx.Done(), t: t}}
if test.plugin != nil {
plugins = append(plugins, test.plugin)
}
registry, prof := schedulerutils.InitRegistryAndConfig(t, pf, plugins...)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
pod, err := testutils.CreatePausePod(testCtx.ClientSet,
testutils.InitPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Fatalf("Error while creating a test pod: %v", err)
}
if test.expectNominatedNodeName {
if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, testCtx.ClientSet, pod); err != nil {
t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", pod.Namespace, pod.Name, err)
}
} else {
if err := testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, pod); err != nil {
t.Errorf("Pod %v/%v was not scheduled: %v", pod.Namespace, pod.Name, err)
}
}
})
}
}