mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Add a separate lock for pod nominator in scheduling queue
This commit is contained in:
		| @@ -337,7 +337,7 @@ func TestSchedulerWithExtenders(t *testing.T) { | ||||
| 				test.registerPlugins, "", | ||||
| 				runtime.WithClientSet(client), | ||||
| 				runtime.WithInformerFactory(informerFactory), | ||||
| 				runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				runtime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				runtime.WithLogger(logger), | ||||
| 			) | ||||
| 			if err != nil { | ||||
|   | ||||
| @@ -365,7 +365,7 @@ func TestPostFilter(t *testing.T) { | ||||
| 				frameworkruntime.WithClientSet(cs), | ||||
| 				frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), | ||||
| 				frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithExtenders(extenders), | ||||
| 				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), | ||||
| 				frameworkruntime.WithLogger(logger), | ||||
| @@ -1102,7 +1102,7 @@ func TestDryRunPreemption(t *testing.T) { | ||||
| 			fwk, err := tf.NewFramework( | ||||
| 				ctx, | ||||
| 				registeredPlugins, "", | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithSnapshotSharedLister(snapshot), | ||||
| 				frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 				frameworkruntime.WithParallelism(parallelism), | ||||
| @@ -1361,7 +1361,7 @@ func TestSelectBestCandidate(t *testing.T) { | ||||
| 					tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), | ||||
| 				}, | ||||
| 				"", | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithSnapshotSharedLister(snapshot), | ||||
| 				frameworkruntime.WithLogger(logger), | ||||
| 			) | ||||
| @@ -1746,7 +1746,7 @@ func TestPreempt(t *testing.T) { | ||||
| 				frameworkruntime.WithClientSet(client), | ||||
| 				frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), | ||||
| 				frameworkruntime.WithExtenders(extenders), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), | ||||
| 				frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 				frameworkruntime.WithWaitingPods(waitingPods), | ||||
|   | ||||
| @@ -341,7 +341,7 @@ func TestDryRunPreemption(t *testing.T) { | ||||
| 			fwk, err := tf.NewFramework( | ||||
| 				ctx, | ||||
| 				registeredPlugins, "", | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 				frameworkruntime.WithParallelism(parallelism), | ||||
| 				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)), | ||||
| @@ -446,7 +446,7 @@ func TestSelectCandidate(t *testing.T) { | ||||
| 				ctx, | ||||
| 				registeredPlugins, | ||||
| 				"", | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithSnapshotSharedLister(snapshot), | ||||
| 				frameworkruntime.WithLogger(logger), | ||||
| 			) | ||||
|   | ||||
| @@ -2356,7 +2356,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { | ||||
| 				) | ||||
| 			} | ||||
|  | ||||
| 			podNominator := internalqueue.NewPodNominator(nil) | ||||
| 			podNominator := internalqueue.NewTestPodNominator(nil) | ||||
| 			if tt.nominatedPod != nil { | ||||
| 				podNominator.AddNominatedPod( | ||||
| 					logger, | ||||
|   | ||||
| @@ -32,10 +32,12 @@ import ( | ||||
| 	"fmt" | ||||
| 	"math/rand" | ||||
| 	"reflect" | ||||
| 	"slices" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| @@ -156,6 +158,11 @@ type PriorityQueue struct { | ||||
| 	stop  chan struct{} | ||||
| 	clock clock.Clock | ||||
|  | ||||
| 	// lock takes precedence and should be taken first, | ||||
| 	// before any other locks in the queue (activeQLock or nominator.nLock). | ||||
| 	// Correct locking order is: lock > activeQLock > nominator.nLock. | ||||
| 	lock sync.RWMutex | ||||
|  | ||||
| 	// pod initial backoff duration. | ||||
| 	podInitialBackoffDuration time.Duration | ||||
| 	// pod maximum backoff duration. | ||||
| @@ -169,9 +176,10 @@ type PriorityQueue struct { | ||||
|  | ||||
| 	// activeQLock synchronizes all operations related to activeQ. | ||||
| 	// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields. | ||||
| 	// Caution: DO NOT take nominator.lock after taking activeQLock, | ||||
| 	// you should take nominator.lock first if you need two locks, | ||||
| 	// otherwise the queue could end up deadlock. | ||||
| 	// Caution: DO NOT take "lock" after taking "activeQLock". | ||||
| 	// You should always take "lock" first, otherwise the queue could end up in deadlock. | ||||
| 	// "activeQLock" should not be taken after taking "nLock". | ||||
| 	// Correct locking order is: lock > activeQLock > nominator.nLock. | ||||
| 	activeQLock sync.RWMutex | ||||
|  | ||||
| 	// inFlightPods holds the UID of all pods which have been popped out for which Done | ||||
| @@ -381,7 +389,6 @@ func NewPriorityQueue( | ||||
| 	} | ||||
|  | ||||
| 	pq := &PriorityQueue{ | ||||
| 		nominator:                         newPodNominator(options.podLister), | ||||
| 		clock:                             options.clock, | ||||
| 		stop:                              make(chan struct{}), | ||||
| 		podInitialBackoffDuration:         options.podInitialBackoffDuration, | ||||
| @@ -401,6 +408,7 @@ func NewPriorityQueue( | ||||
| 	pq.cond.L = &pq.activeQLock | ||||
| 	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) | ||||
| 	pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() | ||||
| 	pq.nominator = newPodNominator(options.podLister, pq.nominatedPodsToInfo) | ||||
|  | ||||
| 	return pq | ||||
| } | ||||
| @@ -606,7 +614,7 @@ func (p *PriorityQueue) moveToActiveQ(logger klog.Logger, pInfo *framework.Queue | ||||
| 	logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) | ||||
| 	metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() | ||||
| 	if event == PodAdd || event == PodUpdate { | ||||
| 		p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) | ||||
| 		p.AddNominatedPod(logger, pInfo.PodInfo, nil) | ||||
| 	} | ||||
|  | ||||
| 	return true, nil | ||||
| @@ -807,7 +815,7 @@ func (p *PriorityQueue) addUnschedulableWithoutQueueingHint(logger klog.Logger, | ||||
| 		metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() | ||||
| 	} | ||||
|  | ||||
| 	p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) | ||||
| 	p.AddNominatedPod(logger, pInfo.PodInfo, nil) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -860,7 +868,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * | ||||
| 		p.cond.Broadcast() | ||||
| 	} | ||||
|  | ||||
| 	p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) | ||||
| 	p.AddNominatedPod(logger, pInfo.PodInfo, nil) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -1022,7 +1030,7 @@ func (p *PriorityQueue) updateInActiveQueue(logger klog.Logger, oldPod, newPod * | ||||
| 	defer p.activeQLock.Unlock() | ||||
| 	if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { | ||||
| 		pInfo := updatePod(oldPodInfo, newPod) | ||||
| 		p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) | ||||
| 		p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) | ||||
| 		return true, p.activeQ.Update(pInfo) | ||||
| 	} | ||||
| 	return false, nil | ||||
| @@ -1068,7 +1076,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error | ||||
| 		// If the pod is in the backoff queue, update it there. | ||||
| 		if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { | ||||
| 			pInfo := updatePod(oldPodInfo, newPod) | ||||
| 			p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) | ||||
| 			p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) | ||||
| 			return p.podBackoffQ.Update(pInfo) | ||||
| 		} | ||||
| 	} | ||||
| @@ -1076,7 +1084,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error | ||||
| 	// If the pod is in the unschedulable queue, updating it may make it schedulable. | ||||
| 	if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil { | ||||
| 		pInfo := updatePod(usPodInfo, newPod) | ||||
| 		p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) | ||||
| 		p.UpdateNominatedPod(logger, oldPod, pInfo.PodInfo) | ||||
| 		gated := usPodInfo.Gated | ||||
| 		if p.isSchedulingQueueHintEnabled { | ||||
| 			// When unscheduled Pods are updated, we check with QueueingHint | ||||
| @@ -1130,7 +1138,7 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error | ||||
| func (p *PriorityQueue) Delete(pod *v1.Pod) error { | ||||
| 	p.lock.Lock() | ||||
| 	defer p.lock.Unlock() | ||||
| 	p.deleteNominatedPodIfExistsUnlocked(pod) | ||||
| 	p.DeleteNominatedPodIfExists(pod) | ||||
| 	pInfo := newQueuedPodInfoForLookup(pod) | ||||
| 	p.activeQLock.Lock() | ||||
| 	defer p.activeQLock.Unlock() | ||||
| @@ -1378,6 +1386,43 @@ func (p *PriorityQueue) PendingPods() ([]*v1.Pod, string) { | ||||
| 	return result, fmt.Sprintf(pendingPodsSummary, activeQLen, p.podBackoffQ.Len(), len(p.unschedulablePods.podInfoMap)) | ||||
| } | ||||
|  | ||||
| // Note: this function assumes the caller locks p.lock.RLock. | ||||
| func (p *PriorityQueue) nominatedPodToInfo(np PodRef) *framework.PodInfo { | ||||
| 	pod := np.ToPod() | ||||
| 	pInfoLookup := newQueuedPodInfoForLookup(pod) | ||||
|  | ||||
| 	obj, exists, _ := p.activeQ.Get(pInfoLookup) | ||||
| 	if exists { | ||||
| 		queuedPodInfo := obj.(*framework.QueuedPodInfo) | ||||
| 		return queuedPodInfo.PodInfo | ||||
| 	} | ||||
|  | ||||
| 	queuedPodInfo := p.unschedulablePods.get(pod) | ||||
| 	if queuedPodInfo != nil { | ||||
| 		return queuedPodInfo.PodInfo | ||||
| 	} | ||||
|  | ||||
| 	obj, exists, _ = p.podBackoffQ.Get(pInfoLookup) | ||||
| 	if exists { | ||||
| 		queuedPodInfo := obj.(*framework.QueuedPodInfo) | ||||
| 		return queuedPodInfo.PodInfo | ||||
| 	} | ||||
|  | ||||
| 	return &framework.PodInfo{Pod: pod} | ||||
| } | ||||
|  | ||||
| func (p *PriorityQueue) nominatedPodsToInfo(nominatedPods []PodRef) []*framework.PodInfo { | ||||
| 	p.lock.RLock() | ||||
| 	defer p.lock.RUnlock() | ||||
| 	p.activeQLock.RLock() | ||||
| 	defer p.activeQLock.RUnlock() | ||||
| 	pods := make([]*framework.PodInfo, len(nominatedPods)) | ||||
| 	for i, np := range nominatedPods { | ||||
| 		pods[i] = p.nominatedPodToInfo(np).DeepCopy() | ||||
| 	} | ||||
| 	return pods | ||||
| } | ||||
|  | ||||
| // Close closes the priority queue. | ||||
| func (p *PriorityQueue) Close() { | ||||
| 	p.lock.Lock() | ||||
| @@ -1392,13 +1437,9 @@ func (p *PriorityQueue) Close() { | ||||
|  | ||||
| // DeleteNominatedPodIfExists deletes <pod> from nominatedPods. | ||||
| func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { | ||||
| 	npm.lock.Lock() | ||||
| 	npm.deleteNominatedPodIfExistsUnlocked(pod) | ||||
| 	npm.lock.Unlock() | ||||
| } | ||||
|  | ||||
| func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) { | ||||
| 	npm.nLock.Lock() | ||||
| 	npm.delete(pod) | ||||
| 	npm.nLock.Unlock() | ||||
| } | ||||
|  | ||||
| // AddNominatedPod adds a pod to the nominated pods of the given node. | ||||
| @@ -1406,22 +1447,20 @@ func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) { | ||||
| // the pod. We update the structure before sending a request to update the pod | ||||
| // object to avoid races with the following scheduling cycles. | ||||
| func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { | ||||
| 	npm.lock.Lock() | ||||
| 	npm.nLock.Lock() | ||||
| 	npm.addNominatedPodUnlocked(logger, pi, nominatingInfo) | ||||
| 	npm.lock.Unlock() | ||||
| 	npm.nLock.Unlock() | ||||
| } | ||||
|  | ||||
| // NominatedPodsForNode returns a copy of pods that are nominated to run on the given node, | ||||
| // but they are waiting for other pods to be removed from the node. | ||||
| // CAUTION: Make sure you don't call this function while taking any lock in any scenario. | ||||
| func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo { | ||||
| 	npm.lock.RLock() | ||||
| 	defer npm.lock.RUnlock() | ||||
| 	// Make a copy of the nominated Pods so the caller can mutate safely. | ||||
| 	pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName])) | ||||
| 	for i := 0; i < len(pods); i++ { | ||||
| 		pods[i] = npm.nominatedPods[nodeName][i].DeepCopy() | ||||
| 	} | ||||
| 	return pods | ||||
| 	npm.nLock.RLock() | ||||
| 	nominatedPods := slices.Clone(npm.nominatedPods[nodeName]) | ||||
| 	npm.nLock.RUnlock() | ||||
| 	// Note that nominatedPodsToInfo takes SchedulingQueue.lock inside. | ||||
| 	return npm.nominatedPodsToInfo(nominatedPods) | ||||
| } | ||||
|  | ||||
| func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { | ||||
| @@ -1542,22 +1581,55 @@ func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRec | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type PodRef struct { | ||||
| 	Name      string | ||||
| 	Namespace string | ||||
| 	UID       types.UID | ||||
| } | ||||
|  | ||||
| func PodToRef(pod *v1.Pod) PodRef { | ||||
| 	return PodRef{ | ||||
| 		Name:      pod.Name, | ||||
| 		Namespace: pod.Namespace, | ||||
| 		UID:       pod.UID, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (np PodRef) ToPod() *v1.Pod { | ||||
| 	return &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      np.Name, | ||||
| 			Namespace: np.Namespace, | ||||
| 			UID:       np.UID, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // nominator is a structure that stores pods nominated to run on nodes. | ||||
| // It exists because nominatedNodeName of pod objects stored in the structure | ||||
| // may be different than what scheduler has here. We should be able to find pods | ||||
| // by their UID and update/delete them. | ||||
| type nominator struct { | ||||
| 	// nLock synchronizes all operations related to nominator. | ||||
| 	// Caution: DO NOT take ("SchedulingQueue.lock" or "SchedulingQueue.activeQLock") after taking "nLock". | ||||
| 	// You should always take "SchedulingQueue.lock" and "SchedulingQueue.activeQLock" first, | ||||
| 	// otherwise the nominator could end up in deadlock. | ||||
| 	// Correct locking order is: SchedulingQueue.lock > SchedulingQueue.activeQLock > nLock. | ||||
| 	nLock sync.RWMutex | ||||
|  | ||||
| 	// podLister is used to verify if the given pod is alive. | ||||
| 	podLister listersv1.PodLister | ||||
| 	// nominatedPods is a map keyed by a node name and the value is a list of | ||||
| 	// pods which are nominated to run on the node. These are pods which can be in | ||||
| 	// the activeQ or unschedulablePods. | ||||
| 	nominatedPods map[string][]*framework.PodInfo | ||||
| 	nominatedPods map[string][]PodRef | ||||
| 	// nominatedPodToNode is map keyed by a Pod UID to the node name where it is | ||||
| 	// nominated. | ||||
| 	nominatedPodToNode map[types.UID]string | ||||
|  | ||||
| 	lock sync.RWMutex | ||||
| 	// nominatedPodsToInfo returns PodInfos cached in the queues for nominated PodRefs. | ||||
| 	// Note: it takes SchedulingQueue.lock inside. | ||||
| 	// Make sure you don't call this function while taking any lock in any scenario. | ||||
| 	nominatedPodsToInfo func([]PodRef) []*framework.PodInfo | ||||
| } | ||||
|  | ||||
| func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { | ||||
| @@ -1589,13 +1661,13 @@ func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework. | ||||
| 	} | ||||
|  | ||||
| 	npm.nominatedPodToNode[pi.Pod.UID] = nodeName | ||||
| 	for _, npi := range npm.nominatedPods[nodeName] { | ||||
| 		if npi.Pod.UID == pi.Pod.UID { | ||||
| 			logger.V(4).Info("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod)) | ||||
| 	for _, np := range npm.nominatedPods[nodeName] { | ||||
| 		if np.UID == pi.Pod.UID { | ||||
| 			logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], pi) | ||||
| 	npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod)) | ||||
| } | ||||
|  | ||||
| func (npm *nominator) delete(p *v1.Pod) { | ||||
| @@ -1604,7 +1676,7 @@ func (npm *nominator) delete(p *v1.Pod) { | ||||
| 		return | ||||
| 	} | ||||
| 	for i, np := range npm.nominatedPods[nnn] { | ||||
| 		if np.Pod.UID == p.UID { | ||||
| 		if np.UID == p.UID { | ||||
| 			npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...) | ||||
| 			if len(npm.nominatedPods[nnn]) == 0 { | ||||
| 				delete(npm.nominatedPods, nnn) | ||||
| @@ -1617,12 +1689,8 @@ func (npm *nominator) delete(p *v1.Pod) { | ||||
|  | ||||
| // UpdateNominatedPod updates the <oldPod> with <newPod>. | ||||
| func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) { | ||||
| 	npm.lock.Lock() | ||||
| 	defer npm.lock.Unlock() | ||||
| 	npm.updateNominatedPodUnlocked(logger, oldPod, newPodInfo) | ||||
| } | ||||
|  | ||||
| func (npm *nominator) updateNominatedPodUnlocked(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) { | ||||
| 	npm.nLock.Lock() | ||||
| 	defer npm.nLock.Unlock() | ||||
| 	// In some cases, an Update event with no "NominatedNode" present is received right | ||||
| 	// after a node("NominatedNode") is reserved for this pod in memory. | ||||
| 	// In this case, we need to keep reserving the NominatedNode when updating the pod pointer. | ||||
| @@ -1646,18 +1714,12 @@ func (npm *nominator) updateNominatedPodUnlocked(logger klog.Logger, oldPod *v1. | ||||
| 	npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo) | ||||
| } | ||||
|  | ||||
| // NewPodNominator creates a nominator as a backing of framework.PodNominator. | ||||
| // A podLister is passed in so as to check if the pod exists | ||||
| // before adding its nominatedNode info. | ||||
| func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator { | ||||
| 	return newPodNominator(podLister) | ||||
| } | ||||
|  | ||||
| func newPodNominator(podLister listersv1.PodLister) *nominator { | ||||
| func newPodNominator(podLister listersv1.PodLister, nominatedPodsToInfo func([]PodRef) []*framework.PodInfo) *nominator { | ||||
| 	return &nominator{ | ||||
| 		podLister:          podLister, | ||||
| 		nominatedPods:      make(map[string][]*framework.PodInfo), | ||||
| 		nominatedPodToNode: make(map[types.UID]string), | ||||
| 		podLister:           podLister, | ||||
| 		nominatedPods:       make(map[string][]PodRef), | ||||
| 		nominatedPodToNode:  make(map[types.UID]string), | ||||
| 		nominatedPodsToInfo: nominatedPodsToInfo, | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -85,7 +85,7 @@ var ( | ||||
|  | ||||
| 	nominatorCmpOpts = []cmp.Option{ | ||||
| 		cmp.AllowUnexported(nominator{}), | ||||
| 		cmpopts.IgnoreFields(nominator{}, "podLister", "lock"), | ||||
| 		cmpopts.IgnoreFields(nominator{}, "podLister", "nLock", "nominatedPodsToInfo"), | ||||
| 	} | ||||
|  | ||||
| 	queueHintReturnQueue = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { | ||||
| @@ -136,8 +136,8 @@ func TestPriorityQueue_Add(t *testing.T) { | ||||
| 			medPriorityPodInfo.Pod.UID:   "node1", | ||||
| 			unschedulablePodInfo.Pod.UID: "node1", | ||||
| 		}, | ||||
| 		nominatedPods: map[string][]*framework.PodInfo{ | ||||
| 			"node1": {medPriorityPodInfo, unschedulablePodInfo}, | ||||
| 		nominatedPods: map[string][]PodRef{ | ||||
| 			"node1": {PodToRef(medPriorityPodInfo.Pod), PodToRef(unschedulablePodInfo.Pod)}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { | ||||
| @@ -870,8 +870,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { | ||||
| 			unschedulablePodInfo.Pod.UID:    "node1", | ||||
| 			highPriNominatedPodInfo.Pod.UID: "node1", | ||||
| 		}, | ||||
| 		nominatedPods: map[string][]*framework.PodInfo{ | ||||
| 			"node1": {highPriNominatedPodInfo, unschedulablePodInfo}, | ||||
| 		nominatedPods: map[string][]PodRef{ | ||||
| 			"node1": {PodToRef(highPriNominatedPodInfo.Pod), PodToRef(unschedulablePodInfo.Pod)}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { | ||||
| @@ -2178,10 +2178,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { | ||||
| 			highPriorityPodInfo.Pod.UID:  "node2", | ||||
| 			unschedulablePodInfo.Pod.UID: "node5", | ||||
| 		}, | ||||
| 		nominatedPods: map[string][]*framework.PodInfo{ | ||||
| 			"node1": {medPriorityPodInfo}, | ||||
| 			"node2": {highPriorityPodInfo}, | ||||
| 			"node5": {unschedulablePodInfo}, | ||||
| 		nominatedPods: map[string][]PodRef{ | ||||
| 			"node1": {PodToRef(medPriorityPodInfo.Pod)}, | ||||
| 			"node2": {PodToRef(highPriorityPodInfo.Pod)}, | ||||
| 			"node5": {PodToRef(unschedulablePodInfo.Pod)}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { | ||||
| @@ -2203,10 +2203,10 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { | ||||
| 			highPriorityPodInfo.Pod.UID:  "node4", | ||||
| 			unschedulablePodInfo.Pod.UID: "node5", | ||||
| 		}, | ||||
| 		nominatedPods: map[string][]*framework.PodInfo{ | ||||
| 			"node1": {medPriorityPodInfo}, | ||||
| 			"node4": {highPriorityPodInfo}, | ||||
| 			"node5": {unschedulablePodInfo}, | ||||
| 		nominatedPods: map[string][]PodRef{ | ||||
| 			"node1": {PodToRef(medPriorityPodInfo.Pod)}, | ||||
| 			"node4": {PodToRef(highPriorityPodInfo.Pod)}, | ||||
| 			"node5": {PodToRef(unschedulablePodInfo.Pod)}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { | ||||
| @@ -2236,9 +2236,9 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { | ||||
| 			medPriorityPodInfo.Pod.UID:   "node1", | ||||
| 			unschedulablePodInfo.Pod.UID: "node5", | ||||
| 		}, | ||||
| 		nominatedPods: map[string][]*framework.PodInfo{ | ||||
| 			"node1": {medPriorityPodInfo}, | ||||
| 			"node5": {unschedulablePodInfo}, | ||||
| 		nominatedPods: map[string][]PodRef{ | ||||
| 			"node1": {PodToRef(medPriorityPodInfo.Pod)}, | ||||
| 			"node5": {PodToRef(unschedulablePodInfo.Pod)}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" { | ||||
|   | ||||
| @@ -22,6 +22,7 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/client-go/informers" | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
| 	listersv1 "k8s.io/client-go/listers/core/v1" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| ) | ||||
|  | ||||
| @@ -53,3 +54,17 @@ func NewTestQueueWithInformerFactory( | ||||
| 	informerFactory.WaitForCacheSync(ctx.Done()) | ||||
| 	return pq | ||||
| } | ||||
|  | ||||
| // NewPodNominator creates a nominator as a backing of framework.PodNominator. | ||||
| // A podLister is passed in so as to check if the pod exists | ||||
| // before adding its nominatedNode info. | ||||
| func NewTestPodNominator(podLister listersv1.PodLister) framework.PodNominator { | ||||
| 	nominatedPodsToInfo := func(nominatedPods []PodRef) []*framework.PodInfo { | ||||
| 		pods := make([]*framework.PodInfo, len(nominatedPods)) | ||||
| 		for i, np := range nominatedPods { | ||||
| 			pods[i] = &framework.PodInfo{Pod: np.ToPod()} | ||||
| 		} | ||||
| 		return pods | ||||
| 	} | ||||
| 	return newPodNominator(podLister, nominatedPodsToInfo) | ||||
| } | ||||
|   | ||||
| @@ -2483,7 +2483,7 @@ func TestSchedulerSchedulePod(t *testing.T) { | ||||
| 				test.registerPlugins, "", | ||||
| 				frameworkruntime.WithSnapshotSharedLister(snapshot), | ||||
| 				frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 			) | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| @@ -2546,7 +2546,7 @@ func TestFindFitAllError(t *testing.T) { | ||||
| 			tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), | ||||
| 		}, | ||||
| 		"", | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| @@ -2586,7 +2586,7 @@ func TestFindFitSomeError(t *testing.T) { | ||||
| 			tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), | ||||
| 		}, | ||||
| 		"", | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| @@ -2663,7 +2663,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { | ||||
| 			fwk, err := tf.NewFramework( | ||||
| 				ctx, | ||||
| 				registerPlugins, "", | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), | ||||
| 			) | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| @@ -2804,7 +2804,7 @@ func TestZeroRequest(t *testing.T) { | ||||
| 				frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 				frameworkruntime.WithSnapshotSharedLister(snapshot), | ||||
| 				frameworkruntime.WithClientSet(client), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 			) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("error creating framework: %+v", err) | ||||
| @@ -3207,7 +3207,7 @@ func Test_prioritizeNodes(t *testing.T) { | ||||
| 				frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 				frameworkruntime.WithSnapshotSharedLister(snapshot), | ||||
| 				frameworkruntime.WithClientSet(client), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 			) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("error creating framework: %+v", err) | ||||
| @@ -3325,7 +3325,7 @@ func TestFairEvaluationForNodes(t *testing.T) { | ||||
| 			tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), | ||||
| 		}, | ||||
| 		"", | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)), | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(nil)), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| @@ -3407,7 +3407,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { | ||||
| 				ctx, | ||||
| 				registerPlugins, "", | ||||
| 				frameworkruntime.WithClientSet(client), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 			) | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| @@ -3565,7 +3565,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien | ||||
| 		frameworkruntime.WithClientSet(client), | ||||
| 		frameworkruntime.WithEventRecorder(recorder), | ||||
| 		frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewTestPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||
| 		frameworkruntime.WithWaitingPods(waitingPods), | ||||
| 	) | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Maciej Skoczeń
					Maciej Skoczeń