mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	fix(scheduling_queue): always put Pods with no unschedulable plugins into activeQ/backoffQ (#119105)
* always put Pods with no unschedulable plugins into activeQ/backoffQ * address review comments
This commit is contained in:
		@@ -613,7 +613,7 @@ func (p *PriorityQueue) SchedulingCycle() int64 {
 | 
			
		||||
 | 
			
		||||
// determineSchedulingHintForInFlightPod looks at the unschedulable plugins of the given Pod
 | 
			
		||||
// and determines the scheduling hint for this Pod while checking the events that happened during in-flight.
 | 
			
		||||
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) framework.QueueingHint {
 | 
			
		||||
func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) framework.QueueingHint {
 | 
			
		||||
	logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", p.inFlightEvents.Len(), "inFlightPodsSize", len(p.inFlightPods))
 | 
			
		||||
 | 
			
		||||
	// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
 | 
			
		||||
@@ -634,17 +634,10 @@ func (p *PriorityQueue) determineSchedulingHintForInFlightPod(logger klog.Logger
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(pInfo.UnschedulablePlugins) == 0 {
 | 
			
		||||
		// When there is no unschedulable plugin, we cannot have a guess which event makes this Pod schedulable.
 | 
			
		||||
		// If there has been any concurrent event for the pod, it has to go to the backoff queue because the event
 | 
			
		||||
		// may have been relevant.
 | 
			
		||||
		for event := inFlightPod.Next(); event != nil; event = event.Next() {
 | 
			
		||||
			_, ok := event.Value.(*clusterEvent)
 | 
			
		||||
			if ok {
 | 
			
		||||
				// There really was a concurrent event.
 | 
			
		||||
				return framework.QueueAfterBackoff
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return framework.QueueSkip
 | 
			
		||||
		// No unschedulable plugins are associated with this Pod.
 | 
			
		||||
		// Meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
 | 
			
		||||
		// In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
 | 
			
		||||
		return framework.QueueAfterBackoff
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// check if there is an event that makes this Pod schedulable based on pInfo.UnschedulablePlugins.
 | 
			
		||||
@@ -692,7 +685,13 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger,
 | 
			
		||||
	for plugin := range pInfo.UnschedulablePlugins {
 | 
			
		||||
		metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
 | 
			
		||||
	}
 | 
			
		||||
	if p.moveRequestCycle >= podSchedulingCycle {
 | 
			
		||||
	if p.moveRequestCycle >= podSchedulingCycle || len(pInfo.UnschedulablePlugins) == 0 {
 | 
			
		||||
		// Two cases to move a Pod to the active/backoff queue:
 | 
			
		||||
		// - The Pod is rejected by some plugins, but a move request is received after this Pod's scheduling cycle is started.
 | 
			
		||||
		//   In this case, the received event may be make Pod schedulable and we should retry scheduling it.
 | 
			
		||||
		// - No unschedulable plugins are associated with this Pod,
 | 
			
		||||
		//   meaning something unusual (a temporal failure on kube-apiserver, etc) happened and this Pod gets moved back to the queue.
 | 
			
		||||
		//   In this case, we should retry scheduling it because this Pod may not be retried until the next flush.
 | 
			
		||||
		if err := p.podBackoffQ.Add(pInfo); err != nil {
 | 
			
		||||
			return fmt.Errorf("error adding pod %v to the backoff queue: %v", klog.KObj(pod), err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -745,8 +744,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
 | 
			
		||||
		metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Based on isPodWorthRequeuing(), we check whether this Pod may change its scheduling result by any of events that happened during scheduling.
 | 
			
		||||
	schedulingHint := p.determineSchedulingHintForInFlightPod(logger, pInfo, podSchedulingCycle)
 | 
			
		||||
	// We check whether this Pod may change its scheduling result by any of events that happened during scheduling.
 | 
			
		||||
	schedulingHint := p.determineSchedulingHintForInFlightPod(logger, pInfo)
 | 
			
		||||
 | 
			
		||||
	// In this case, we try to requeue this Pod to activeQ/backoffQ.
 | 
			
		||||
	queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
 | 
			
		||||
@@ -1115,8 +1114,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
 | 
			
		||||
 | 
			
		||||
	p.moveRequestCycle = p.schedulingCycle
 | 
			
		||||
 | 
			
		||||
	// (no need to check the feature gate because there is always no p.inFlightPods when the feature is disabled.)
 | 
			
		||||
	if len(p.inFlightPods) != 0 {
 | 
			
		||||
	if p.isSchedulingQueueHintEnabled && len(p.inFlightPods) != 0 {
 | 
			
		||||
		logger.V(5).Info("Event received while pods are in flight", "event", event.Label, "numPods", len(p.inFlightPods))
 | 
			
		||||
		// AddUnschedulableIfNotPresent might get called for in-flight Pods later, and in
 | 
			
		||||
		// AddUnschedulableIfNotPresent we need to know whether events were
 | 
			
		||||
 
 | 
			
		||||
@@ -494,15 +494,17 @@ func Test_InFlightPods(t *testing.T) {
 | 
			
		||||
				{eventHappens: &AssignedPodAdd},
 | 
			
		||||
				{callback: func(t *testing.T, q *PriorityQueue) {
 | 
			
		||||
					logger, _ := ktesting.NewTestContext(t)
 | 
			
		||||
					if err := q.AddUnschedulableIfNotPresent(logger, poppedPod, q.SchedulingCycle()); err != nil {
 | 
			
		||||
						t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
					err := q.AddUnschedulableIfNotPresent(logger, poppedPod, q.SchedulingCycle())
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
					}
 | 
			
		||||
				}},
 | 
			
		||||
				{callback: func(t *testing.T, q *PriorityQueue) {
 | 
			
		||||
					logger, _ := ktesting.NewTestContext(t)
 | 
			
		||||
					poppedPod2.UnschedulablePlugins = sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3")
 | 
			
		||||
					if err := q.AddUnschedulableIfNotPresent(logger, poppedPod2, q.SchedulingCycle()); err != nil {
 | 
			
		||||
						t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
					err := q.AddUnschedulableIfNotPresent(logger, poppedPod2, q.SchedulingCycle())
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
					}
 | 
			
		||||
				}},
 | 
			
		||||
			},
 | 
			
		||||
@@ -597,7 +599,10 @@ func Test_InFlightPods(t *testing.T) {
 | 
			
		||||
				case action.eventHappens != nil:
 | 
			
		||||
					q.MoveAllToActiveOrBackoffQueue(logger, *action.eventHappens, nil, nil, nil)
 | 
			
		||||
				case action.podEnqueued != nil:
 | 
			
		||||
					q.AddUnschedulableIfNotPresent(logger, action.podEnqueued, q.SchedulingCycle())
 | 
			
		||||
					err := q.AddUnschedulableIfNotPresent(logger, action.podEnqueued, q.SchedulingCycle())
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
					}
 | 
			
		||||
				case action.callback != nil:
 | 
			
		||||
					action.callback(t, q)
 | 
			
		||||
				}
 | 
			
		||||
@@ -739,7 +744,10 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	q.Add(logger, highPriNominatedPodInfo.Pod)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle())
 | 
			
		||||
	err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePodInfo.Pod, "plugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	expectedNominatedPods := &nominator{
 | 
			
		||||
		nominatedPodToNode: map[types.UID]string{
 | 
			
		||||
			unschedulablePodInfo.Pod.UID:    "node1",
 | 
			
		||||
@@ -813,8 +821,9 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePod), oldCycle); err != nil {
 | 
			
		||||
			t.Errorf("Failed to call AddUnschedulableIfNotPresent(%v): %v", unschedulablePod.Name, err)
 | 
			
		||||
		err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePod, "plugin"), oldCycle)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -907,7 +916,10 @@ func TestPriorityQueue_Update(t *testing.T) {
 | 
			
		||||
	if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(medPriorityPodInfo.Pod), q.SchedulingCycle())
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(medPriorityPodInfo.Pod, "plugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if len(q.unschedulablePods.podInfoMap) != 1 {
 | 
			
		||||
		t.Error("Expected unschedulablePods to be 1.")
 | 
			
		||||
	}
 | 
			
		||||
@@ -920,9 +932,20 @@ func TestPriorityQueue_Update(t *testing.T) {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", updatedPod.Name, podGotFromBackoffQ.Name)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before testing AddUnschedulableIfNotPresent.
 | 
			
		||||
	err = q.activeQ.Add(podInfo)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from activeQ.Add: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	// updating a pod which is in unschedulable queue, and it is not backing off,
 | 
			
		||||
	// we will move it to active queue
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(medPriorityPodInfo.Pod), q.SchedulingCycle())
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(medPriorityPodInfo.Pod, "plugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if len(q.unschedulablePods.podInfoMap) != 1 {
 | 
			
		||||
		t.Error("Expected unschedulablePods to be 1.")
 | 
			
		||||
	}
 | 
			
		||||
@@ -1231,7 +1254,10 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
 | 
			
		||||
							// Random case.
 | 
			
		||||
							podInfo = q.newQueuedPodInfo(p, plugins[j%len(plugins)])
 | 
			
		||||
						}
 | 
			
		||||
						q.AddUnschedulableIfNotPresent(logger, podInfo, q.SchedulingCycle())
 | 
			
		||||
						err := q.AddUnschedulableIfNotPresent(logger, podInfo, q.SchedulingCycle())
 | 
			
		||||
						if err != nil {
 | 
			
		||||
							b.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					b.StartTimer()
 | 
			
		||||
@@ -1304,8 +1330,10 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
 | 
			
		||||
			if p, err := q.Pop(); err != nil || p.Pod != test.podInfo.Pod {
 | 
			
		||||
				t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name)
 | 
			
		||||
			}
 | 
			
		||||
			q.AddUnschedulableIfNotPresent(logger, test.podInfo, q.SchedulingCycle())
 | 
			
		||||
 | 
			
		||||
			err := q.AddUnschedulableIfNotPresent(logger, test.podInfo, q.SchedulingCycle())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			cl.Step(test.duration)
 | 
			
		||||
 | 
			
		||||
			q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
 | 
			
		||||
@@ -1349,8 +1377,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
 | 
			
		||||
	err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q)
 | 
			
		||||
	// Construct a Pod, but don't associate its scheduler failure to any plugin
 | 
			
		||||
	hpp1 := clonePod(highPriorityPodInfo.Pod, "hpp1")
 | 
			
		||||
@@ -1359,7 +1393,11 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q, hpp1.UID)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
 | 
			
		||||
	// This Pod will go to backoffQ because no failure plugin is associated with it.
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q)
 | 
			
		||||
	// Construct another Pod, and associate its scheduler failure to plugin "barPlugin".
 | 
			
		||||
	hpp2 := clonePod(highPriorityPodInfo.Pod, "hpp2")
 | 
			
		||||
@@ -1368,9 +1406,14 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", hpp2, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q, hpp2.UID)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
 | 
			
		||||
	// This Pod will go to the unschedulable Pod pool.
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q)
 | 
			
		||||
	// Pods is still backing off, move the pod into backoffQ.
 | 
			
		||||
	// This NodeAdd event moves unschedulablePodInfo and highPriorityPodInfo to the backoffQ,
 | 
			
		||||
	// because of the queueing hint function registered for NodeAdd/fooPlugin.
 | 
			
		||||
	q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
 | 
			
		||||
	q.Add(logger, medPriorityPodInfo.Pod)
 | 
			
		||||
	if q.activeQ.Len() != 1 {
 | 
			
		||||
@@ -1408,24 +1451,42 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
 | 
			
		||||
	if p, err := q.Pop(); err != nil || p.Pod != hpp1 {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", hpp1, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	unschedulableQueuedPodInfo := q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin")
 | 
			
		||||
	highPriorityQueuedPodInfo := q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin")
 | 
			
		||||
	hpp1QueuedPodInfo := q.newQueuedPodInfo(hpp1)
 | 
			
		||||
	expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, unschedulablePodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, unschedulableQueuedPodInfo, q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, highPriorityPodInfo.Pod.UID, hpp1.UID)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, highPriorityQueuedPodInfo, q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID, hpp1.UID)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp1), q.SchedulingCycle())
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, hpp1QueuedPodInfo, q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
 | 
			
		||||
	q.Add(logger, medPriorityPodInfo.Pod)
 | 
			
		||||
	for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp1, hpp2} {
 | 
			
		||||
	// hpp1 will go to backoffQ because no failure plugin is associated with it.
 | 
			
		||||
	// All plugins other than hpp1 are enqueued to the unschedulable Pod pool.
 | 
			
		||||
	for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp2} {
 | 
			
		||||
		if q.unschedulablePods.get(pod) == nil {
 | 
			
		||||
			t.Errorf("Expected %v in the unschedulablePods", pod.Name)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if _, ok, _ := q.podBackoffQ.Get(hpp1QueuedPodInfo); !ok {
 | 
			
		||||
		t.Errorf("Expected %v in the podBackoffQ", hpp1.Name)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
 | 
			
		||||
	// and the pods will be moved into activeQ.
 | 
			
		||||
	c.Step(q.podInitialBackoffDuration)
 | 
			
		||||
	q.flushBackoffQCompleted(logger) // flush the completed backoffQ to move hpp1 to activeQ.
 | 
			
		||||
	q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
 | 
			
		||||
	// hpp2 won't be moved regardless of its backoff timer.
 | 
			
		||||
	if q.activeQ.Len() != 4 {
 | 
			
		||||
		t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len())
 | 
			
		||||
	}
 | 
			
		||||
@@ -1433,6 +1494,10 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
 | 
			
		||||
		t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len())
 | 
			
		||||
	}
 | 
			
		||||
	expectInFlightPods(t, q, medPriorityPodInfo.Pod.UID)
 | 
			
		||||
	if len(q.unschedulablePods.podInfoMap) != 1 {
 | 
			
		||||
		// hpp2 won't be moved regardless of its backoff timer.
 | 
			
		||||
		t.Errorf("Expected 1 item to be in unschedulablePods, but got: %v", len(q.unschedulablePods.podInfoMap))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func clonePod(pod *v1.Pod, newName string) *v1.Pod {
 | 
			
		||||
@@ -1493,8 +1558,14 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	q.Add(logger, medPriorityPodInfo.Pod)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(affinityPod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(affinityPod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Move clock to make the unschedulable pods complete backoff.
 | 
			
		||||
	c.Step(DefaultPodInitialBackoffDuration + time.Second)
 | 
			
		||||
@@ -1615,8 +1686,14 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	q.Add(logger, medPriorityPodInfo.Pod)
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle())
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle())
 | 
			
		||||
	err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "plugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "plugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	expectedSet := makeSet([]*v1.Pod{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod})
 | 
			
		||||
	gotPods, gotSummary := q.PendingPods()
 | 
			
		||||
@@ -1904,8 +1981,12 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
 | 
			
		||||
		Message:       "fake scheduling failure",
 | 
			
		||||
		LastProbeTime: metav1.Now(),
 | 
			
		||||
	})
 | 
			
		||||
	p1.UnschedulablePlugins = sets.New("plugin")
 | 
			
		||||
	// Put in the unschedulable queue.
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, p1, q.SchedulingCycle())
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, p1, q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	c.Step(DefaultPodInitialBackoffDuration)
 | 
			
		||||
	// Move all unschedulable pods to the active queue.
 | 
			
		||||
	q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
 | 
			
		||||
@@ -1952,7 +2033,10 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	// Put in the unschedulable queue
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePod), q.SchedulingCycle())
 | 
			
		||||
	err := q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePod, "plugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	// Move clock to make the unschedulable pods complete backoff.
 | 
			
		||||
	c.Step(DefaultPodInitialBackoffDuration + time.Second)
 | 
			
		||||
	// Move all unschedulable pods to the active queue.
 | 
			
		||||
@@ -1982,7 +2066,10 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	// And then, put unschedulable pod to the unschedulable queue
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePod), q.SchedulingCycle())
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, newQueuedPodInfoForLookup(unschedulablePod, "plugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	// Move clock to make the unschedulable pods complete backoff.
 | 
			
		||||
	c.Step(DefaultPodInitialBackoffDuration + time.Second)
 | 
			
		||||
	// Move all unschedulable pods to the active queue.
 | 
			
		||||
@@ -2028,7 +2115,10 @@ func TestHighPriorityBackoff(t *testing.T) {
 | 
			
		||||
		Message: "fake scheduling failure",
 | 
			
		||||
	})
 | 
			
		||||
	// Put in the unschedulable queue.
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, p, q.SchedulingCycle())
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, p, q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	// Move all unschedulable pods to the active queue.
 | 
			
		||||
	q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil)
 | 
			
		||||
 | 
			
		||||
@@ -2084,8 +2174,14 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
 | 
			
		||||
	if p, err := q.Pop(); err != nil || p.Pod != midPod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", midPod.Name, p.Pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(midPod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(midPod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	c.Step(DefaultPodMaxInUnschedulablePodsDuration + time.Second)
 | 
			
		||||
	q.flushUnschedulablePodsLeftover(logger)
 | 
			
		||||
 | 
			
		||||
@@ -2649,6 +2745,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
 | 
			
		||||
					t.Fatalf("Failed to pop a pod %v", err)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				pInfo.UnschedulablePlugins = sets.New("plugin")
 | 
			
		||||
				queue.AddUnschedulableIfNotPresent(logger, pInfo, 1)
 | 
			
		||||
				// Override clock to exceed the DefaultPodMaxInUnschedulablePodsDuration so that unschedulable pods
 | 
			
		||||
				// will be moved to activeQ
 | 
			
		||||
@@ -2668,6 +2765,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
 | 
			
		||||
					t.Fatalf("Failed to pop a pod %v", err)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				pInfo.UnschedulablePlugins = sets.New("plugin")
 | 
			
		||||
				queue.AddUnschedulableIfNotPresent(logger, pInfo, 1)
 | 
			
		||||
				// Override clock to exceed the DefaultPodMaxInUnschedulablePodsDuration so that unschedulable pods
 | 
			
		||||
				// will be moved to activeQ
 | 
			
		||||
@@ -2857,8 +2955,9 @@ func TestBackOffFlow(t *testing.T) {
 | 
			
		||||
			if podInfo.Attempts != i+1 {
 | 
			
		||||
				t.Errorf("got attempts %d, want %d", podInfo.Attempts, i+1)
 | 
			
		||||
			}
 | 
			
		||||
			if err := q.AddUnschedulableIfNotPresent(logger, podInfo, int64(i)); err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			err = q.AddUnschedulableIfNotPresent(logger, podInfo, int64(i))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// An event happens.
 | 
			
		||||
@@ -2947,7 +3046,11 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
 | 
			
		||||
				if p, err := q.Pop(); err != nil || p.Pod != podInfo.Pod {
 | 
			
		||||
					t.Errorf("Expected: %v after Pop, but got: %v", podInfo.Pod.Name, p.Pod.Name)
 | 
			
		||||
				}
 | 
			
		||||
				q.AddUnschedulableIfNotPresent(logger, podInfo, q.schedulingCycle)
 | 
			
		||||
				podInfo.UnschedulablePlugins = sets.New("plugin")
 | 
			
		||||
				err := q.AddUnschedulableIfNotPresent(logger, podInfo, q.schedulingCycle)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
				// NOTE: On Windows, time.Now() is not as precise, 2 consecutive calls may return the same timestamp,
 | 
			
		||||
				// resulting in 0 time delta / latency. This will cause the pods to be backed off in a random
 | 
			
		||||
				// order, which would cause this test to fail, since the expectation is for them to be backed off
 | 
			
		||||
 
 | 
			
		||||
@@ -2307,8 +2307,11 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
 | 
			
		||||
			preemptor:              st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(highPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                   "rejecting a waiting pod to trigger retrying unschedulable pods immediately, but the waiting pod itself won't be retried",
 | 
			
		||||
			maxNumWaitingPodCalled: 1,
 | 
			
		||||
			// The waiting Pod has once gone through the scheduling cycle,
 | 
			
		||||
			// and we don't know if it's schedulable or not after it's preempted.
 | 
			
		||||
			// So, we should retry the scheduling of it so that it won't stuck in the unschedulable Pod pool.
 | 
			
		||||
			name:                   "rejecting a waiting pod to trigger retrying unschedulable pods immediately, and the waiting pod itself will be retried",
 | 
			
		||||
			maxNumWaitingPodCalled: 2,
 | 
			
		||||
			waitingPod:             st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(),
 | 
			
		||||
			preemptor:              st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(highPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(),
 | 
			
		||||
		},
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user