feature(scheduler): simplify QueueingHint by introducing new statuses

This commit is contained in:
Kensei Nakada
2023-10-19 11:02:11 +00:00
parent de054fbf94
commit cb5dc46edf
17 changed files with 360 additions and 284 deletions

View File

@@ -397,72 +397,97 @@ func (p *PriorityQueue) Run(logger klog.Logger) {
}, 30*time.Second, p.stop)
}
// isPodWorthRequeuing calls QueueingHintFn of only plugins registered in pInfo.unschedulablePlugins.
// If any QueueingHintFn returns QueueImmediately, the scheduling queue is supposed to enqueue this Pod to activeQ.
// If no QueueingHintFn returns QueueImmediately, but some return QueueAfterBackoff,
// queueingStrategy indicates how the scheduling queue should enqueue the Pod from unschedulable pod pool.
type queueingStrategy int
const (
// queueSkip indicates that the scheduling queue should skip requeuing the Pod to activeQ/backoffQ.
queueSkip queueingStrategy = iota
// queueAfterBackoff indicates that the scheduling queue should requeue the Pod after backoff is completed.
queueAfterBackoff
// queueImmediately indicates that the scheduling queue should skip backoff and requeue the Pod immediately to activeQ.
queueImmediately
)
// isPodWorthRequeuing calls QueueingHintFn of only plugins registered in pInfo.unschedulablePlugins and pInfo.PendingPlugins.
//
// If any of pInfo.PendingPlugins return Queue,
// the scheduling queue is supposed to enqueue this Pod to activeQ, skipping backoffQ.
// If any of pInfo.unschedulablePlugins return Queue,
// the scheduling queue is supposed to enqueue this Pod to activeQ/backoffQ depending on the remaining backoff time of the Pod.
// If all QueueingHintFn returns QueueSkip, the scheduling queue enqueues the Pod back to unschedulable Pod pool
// If all QueueingHintFns returns Skip, the scheduling queue enqueues the Pod back to unschedulable Pod pool
// because no plugin changes the scheduling result via the event.
func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) framework.QueueingHint {
if pInfo.UnschedulablePlugins.Len() == 0 {
logger.V(6).Info("Worth requeuing because no unschedulable plugins", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) queueingStrategy {
failedPlugins := pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins)
if failedPlugins.Len() == 0 {
logger.V(6).Info("Worth requeuing because no failed plugins", "pod", klog.KObj(pInfo.Pod))
return queueAfterBackoff
}
if event.IsWildCard() {
// If the wildcard event is special one as someone wants to force all Pods to move to activeQ/backoffQ.
// We return queueAfterBackoff in this case, while resetting all blocked plugins.
logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
return queueAfterBackoff
}
hintMap, ok := p.queueingHintMap[pInfo.Pod.Spec.SchedulerName]
if !ok {
// shouldn't reach here unless bug.
logger.Error(nil, "No QueueingHintMap is registered for this profile", "profile", pInfo.Pod.Spec.SchedulerName, "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
return queueAfterBackoff
}
pod := pInfo.Pod
queueHint := framework.QueueSkip
queueStrategy := queueSkip
for eventToMatch, hintfns := range hintMap {
if eventToMatch.Resource != event.Resource || eventToMatch.ActionType&event.ActionType == 0 {
continue
}
for _, hintfn := range hintfns {
if !pInfo.UnschedulablePlugins.Has(hintfn.PluginName) {
if !failedPlugins.Has(hintfn.PluginName) {
// skip if it's not hintfn from failedPlugins.
continue
}
h, err := hintfn.QueueingHintFn(logger, pod, oldObj, newObj)
if err != nil {
// If the QueueingHintFn returned an error, we should treat the event as QueueAfterBackoff so that we can prevent
// the Pod from stucking in the unschedulable pod pool.
// If the QueueingHintFn returned an error, we should treat the event as Queue so that we can prevent
// the Pod from being stuck in the unschedulable pod pool.
oldObjMeta, newObjMeta, asErr := util.As[klog.KMetadata](oldObj, newObj)
if asErr != nil {
logger.Error(err, "QueueingHintFn returns error", "event", event, "plugin", hintfn.PluginName, "pod", klog.KObj(pod))
} else {
logger.Error(err, "QueueingHintFn returns error", "event", event, "plugin", hintfn.PluginName, "pod", klog.KObj(pod), "oldObj", klog.KObj(oldObjMeta), "newObj", klog.KObj(newObjMeta))
}
h = framework.QueueAfterBackoff
h = framework.Queue
}
switch h {
case framework.QueueSkip:
if h == framework.QueueSkip {
continue
case framework.QueueImmediately:
return h
case framework.QueueAfterBackoff:
// replace queueHint with the returned value,
// but continue to other queueHintFn to check because other plugins may want to return QueueImmediately.
queueHint = h
}
if pInfo.PendingPlugins.Has(hintfn.PluginName) {
// interprets Queue from the Pending plugin as queueImmediately.
// We can return immediately because queueImmediately is the highest priority.
return queueImmediately
}
// interprets Queue from the unschedulable plugin as queueAfterBackoff.
if pInfo.PendingPlugins.Len() == 0 {
// We can return immediately because no Pending plugins, which only can make queueImmediately, registered in this Pod,
// and queueAfterBackoff is the second highest priority.
return queueAfterBackoff
}
// We can't return immediately because there are some Pending plugins registered in this Pod.
// We need to check if those plugins return Queue or not and if they do, we return queueImmediately.
queueStrategy = queueAfterBackoff
}
}
// No queueing hint function is registered for this event
// or no queueing hint fn returns the value other than QueueSkip.
return queueHint
return queueStrategy
}
// runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin.
@@ -626,7 +651,7 @@ func (p *PriorityQueue) SchedulingCycle() int64 {
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) framework.QueueingHint {
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) queueingStrategy {
logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods))
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
@@ -638,48 +663,50 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger
// be empty. If it is not, we may have a problem.
if len(pInfo.UnschedulablePlugins) != 0 {
logger.Error(nil, "In flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler.", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
return queueAfterBackoff
}
if p.inFlightEvents.Len() > len(p.inFlightPods) {
return framework.QueueAfterBackoff
return queueAfterBackoff
}
return framework.QueueSkip
return queueSkip
}
if len(pInfo.UnschedulablePlugins) == 0 {
// No unschedulable plugins are associated with this Pod.
failedPlugins := pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins)
if len(failedPlugins) == 0 {
// No failed plugins are associated with this Pod.
// Meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
// In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
return framework.QueueAfterBackoff
return queueAfterBackoff
}
// check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins.
schedulingHint := framework.QueueSkip
queueingStrategy := queueSkip
for event := inFlightPod.Next(); event != nil; event = event.Next() {
e, ok := event.Value.(*clusterEvent)
if !ok {
// Must be another pod. Can be ignored.
// Must be another in-flight Pod (*v1.Pod). Can be ignored.
continue
}
logger.V(5).Info("Checking event for in-flight pod", "pod", klog.KObj(pInfo.Pod), "event", e.event.Label)
hint := p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj)
if hint == framework.QueueSkip {
switch p.isPodWorthRequeuing(logger, pInfo, e.event, e.oldObj, e.newObj) {
case queueSkip:
continue
}
if hint == framework.QueueImmediately {
// QueueImmediately is the strongest opinion, we don't need to check other events.
schedulingHint = framework.QueueImmediately
break
}
if hint == framework.QueueAfterBackoff {
// replace schedulingHint with QueueAfterBackoff,
// but continue to check other events because we may find it QueueImmediately with other events.
schedulingHint = framework.QueueAfterBackoff
case queueImmediately:
// queueImmediately is the highest priority.
// No need to go through the rest of the events.
return queueImmediately
case queueAfterBackoff:
// replace schedulingHint with queueAfterBackoff
queueingStrategy = queueAfterBackoff
if pInfo.PendingPlugins.Len() == 0 {
// We can return immediately because no Pending plugins, which only can make queueImmediately, registered in this Pod,
// and queueAfterBackoff is the second highest priority.
return queueAfterBackoff
}
}
}
return schedulingHint
return queueingStrategy
}
// addUnschedulableIfNotPresentWithoutQueueingHint inserts a pod that cannot be scheduled into
@@ -693,12 +720,16 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
// Refresh the timestamp since the pod is re-added.
pInfo.Timestamp = p.clock.Now()
// When the queueing hint is enabled, they are used differently.
// But, we use all of them as UnschedulablePlugins when the queueing hint isn't enabled so that we don't break the old behaviour.
failedPlugins := pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins)
// If a move request has been received, move it to the BackoffQ, otherwise move
// it to unschedulablePods.
for plugin := range pInfo.UnschedulablePlugins {
for plugin := range failedPlugins {
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
}
if p.moveRequestCycle >= podSchedulingCycle || len(pInfo.UnschedulablePlugins) == 0 {
if p.moveRequestCycle >= podSchedulingCycle || len(failedPlugins) == 0 {
// Two cases to move a Pod to the active/backoff queue:
// - The Pod is rejected by some plugins, but a move request is received after this Pod's scheduling cycle is started.
// In this case, the received event may be make Pod schedulable and we should retry scheduling it.
@@ -753,7 +784,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
// If a move request has been received, move it to the BackoffQ, otherwise move
// it to unschedulablePods.
for plugin := range pInfo.UnschedulablePlugins {
failedPlugins := pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins)
for plugin := range failedPlugins {
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
}
@@ -762,7 +794,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
// In this case, we try to requeue this Pod to activeQ/backoffQ.
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint)
logger.V(3).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", queue, "schedulingCycle", podSchedulingCycle, "hint", schedulingHint, "unschedulable plugins", failedPlugins)
if queue == activeQ {
// When the Pod is moved to activeQ, need to let p.cond know so that the Pod will be pop()ed out.
p.cond.Broadcast()
@@ -853,10 +885,11 @@ func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
}
// Update metrics and reset the set of unschedulable plugins for the next attempt.
for plugin := range pInfo.UnschedulablePlugins {
for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) {
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec()
}
pInfo.UnschedulablePlugins.Clear()
pInfo.PendingPlugins.Clear()
return pInfo, nil
}
@@ -1067,15 +1100,15 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event
// It returns the queue name Pod goes.
//
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, schedulingHint framework.QueueingHint, event string) string {
if schedulingHint == framework.QueueSkip {
func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, strategy queueingStrategy, event string) string {
if strategy == queueSkip {
p.unschedulablePods.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", event).Inc()
return unschedulablePods
}
pod := pInfo.Pod
if schedulingHint == framework.QueueAfterBackoff && p.isPodBackingoff(pInfo) {
if strategy == queueAfterBackoff && p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
p.unschedulablePods.addOrUpdate(pInfo)
@@ -1086,7 +1119,7 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra
return backoffQ
}
// Reach here if schedulingHint is QueueImmediately, or schedulingHint is QueueAfterBackoff but the pod is not backing off.
// Reach here if schedulingHint is QueueImmediately, or schedulingHint is Queue but the pod is not backing off.
added, err := p.addToActiveQ(logger, pInfo)
if err != nil {
@@ -1111,7 +1144,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
activated := false
for _, pInfo := range podInfoList {
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
if schedulingHint == framework.QueueSkip {
if schedulingHint == queueSkip {
// QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event.
logger.V(5).Info("Event is not making pod schedulable", "pod", klog.KObj(pInfo.Pod), "event", event.Label)
continue