mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #55109 from bsalamat/starvation2
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add a new scheduling queue based on priority queue. **What this PR does / why we need it**: This PR is a part of solution to fix potential starvation of pods in pod preemption. It adds a different scheduling queue to the scheduler that allows highest priority pods to be scheduled before other pods. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes # **Special notes for your reviewer**: I tried enabling this queue and made sure that our existing tests pass with this queue enabled. **Release note**: ```release-note Add a new scheduling queue that helps schedule the highest priority pending pod first. ``` /sig scheduling ref/ #54501 ref/ #47604
This commit is contained in:
		@@ -38,7 +38,6 @@ go_library(
 | 
			
		||||
    ],
 | 
			
		||||
    importpath = "k8s.io/kubernetes/plugin/pkg/scheduler",
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/features:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/algorithm:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/api:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/core:go_default_library",
 | 
			
		||||
@@ -50,7 +49,6 @@ go_library(
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/record:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -1070,7 +1070,7 @@ func (c *PodAffinityChecker) anyPodMatchesPodAffinityTerm(pod *v1.Pod, allPods [
 | 
			
		||||
	return false, matchingPodExists, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) {
 | 
			
		||||
func GetPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) {
 | 
			
		||||
	if podAffinity != nil {
 | 
			
		||||
		if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
 | 
			
		||||
			terms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
 | 
			
		||||
@@ -1083,7 +1083,7 @@ func getPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTer
 | 
			
		||||
	return terms
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) {
 | 
			
		||||
func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) {
 | 
			
		||||
	if podAntiAffinity != nil {
 | 
			
		||||
		if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
 | 
			
		||||
			terms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
 | 
			
		||||
@@ -1133,7 +1133,7 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
 | 
			
		||||
			if affinity == nil {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
 | 
			
		||||
			for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
 | 
			
		||||
				namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
 | 
			
		||||
				selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
@@ -1160,7 +1160,7 @@ func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.P
 | 
			
		||||
	var result []matchingPodAntiAffinityTerm
 | 
			
		||||
	affinity := existingPod.Spec.Affinity
 | 
			
		||||
	if affinity != nil && affinity.PodAntiAffinity != nil {
 | 
			
		||||
		for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
 | 
			
		||||
		for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
 | 
			
		||||
			namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
 | 
			
		||||
			selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
@@ -1257,7 +1257,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check all affinity terms.
 | 
			
		||||
	for _, term := range getPodAffinityTerms(affinity.PodAffinity) {
 | 
			
		||||
	for _, term := range GetPodAffinityTerms(affinity.PodAffinity) {
 | 
			
		||||
		termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, node, &term)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err)
 | 
			
		||||
@@ -1290,7 +1290,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check all anti-affinity terms.
 | 
			
		||||
	for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
 | 
			
		||||
	for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
 | 
			
		||||
		termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, node, &term)
 | 
			
		||||
		if err != nil || termMatches {
 | 
			
		||||
			glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
 | 
			
		||||
 
 | 
			
		||||
@@ -12,6 +12,7 @@ go_test(
 | 
			
		||||
        "equivalence_cache_test.go",
 | 
			
		||||
        "extender_test.go",
 | 
			
		||||
        "generic_scheduler_test.go",
 | 
			
		||||
        "scheduling_queue_test.go",
 | 
			
		||||
    ],
 | 
			
		||||
    importpath = "k8s.io/kubernetes/plugin/pkg/scheduler/core",
 | 
			
		||||
    library = ":go_default_library",
 | 
			
		||||
@@ -23,6 +24,7 @@ go_test(
 | 
			
		||||
        "//plugin/pkg/scheduler/api:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/schedulercache:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/testing:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/util:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/api/apps/v1beta1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
 | 
			
		||||
@@ -46,12 +48,14 @@ go_library(
 | 
			
		||||
        "//pkg/util/hash:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/algorithm:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/api:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/schedulercache:go_default_library",
 | 
			
		||||
        "//plugin/pkg/scheduler/util:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/groupcache/lru:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -15,47 +15,701 @@ limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
// This file contains structures that implement scheduling queue types.
 | 
			
		||||
// Scheduling queues hold pending pods waiting to be scheduled.
 | 
			
		||||
// Scheduling queues hold pods waiting to be scheduled. This file has two types
 | 
			
		||||
// of scheduling queue: 1) a FIFO, which is mostly the same as cache.FIFO, 2) a
 | 
			
		||||
// priority queue which has two sub queues. One sub-queue holds pods that are
 | 
			
		||||
// being considered for scheduling. This is called activeQ. Another queue holds
 | 
			
		||||
// pods that are already tried and are determined to be unschedulable. The latter
 | 
			
		||||
// is called unschedulableQ.
 | 
			
		||||
// FIFO is here for flag-gating purposes and allows us to use the traditional
 | 
			
		||||
// scheduling queue when Pod Priority flag is false.
 | 
			
		||||
 | 
			
		||||
package core
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"container/heap"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
 | 
			
		||||
	priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
 | 
			
		||||
	"k8s.io/kubernetes/plugin/pkg/scheduler/util"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	"reflect"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
 | 
			
		||||
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
 | 
			
		||||
// makes it easy to use those data structures as a SchedulingQueue.
 | 
			
		||||
type SchedulingQueue interface {
 | 
			
		||||
	Add(obj interface{}) error
 | 
			
		||||
	AddIfNotPresent(obj interface{}) error
 | 
			
		||||
	Pop() (interface{}, error)
 | 
			
		||||
	Update(obj interface{}) error
 | 
			
		||||
	Delete(obj interface{}) error
 | 
			
		||||
	List() []interface{}
 | 
			
		||||
	ListKeys() []string
 | 
			
		||||
	Get(obj interface{}) (item interface{}, exists bool, err error)
 | 
			
		||||
	GetByKey(key string) (item interface{}, exists bool, err error)
 | 
			
		||||
	Add(pod *v1.Pod) error
 | 
			
		||||
	AddIfNotPresent(pod *v1.Pod) error
 | 
			
		||||
	AddUnschedulableIfNotPresent(pod *v1.Pod) error
 | 
			
		||||
	Pop() (*v1.Pod, error)
 | 
			
		||||
	Update(pod *v1.Pod) error
 | 
			
		||||
	Delete(pod *v1.Pod) error
 | 
			
		||||
	MoveAllToActiveQueue()
 | 
			
		||||
	AssignedPodAdded(pod *v1.Pod)
 | 
			
		||||
	AssignedPodUpdated(pod *v1.Pod)
 | 
			
		||||
	WaitingPodsForNode(nodeName string) []*v1.Pod
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// FIFO is only used to add a Pop() method to cache.FIFO so that it can be
 | 
			
		||||
// used as a SchedulingQueue interface.
 | 
			
		||||
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
 | 
			
		||||
// enabled a priority queue is returned. If it is disabled, a FIFO is returned.
 | 
			
		||||
func NewSchedulingQueue() SchedulingQueue {
 | 
			
		||||
	if util.PodPriorityEnabled() {
 | 
			
		||||
		return NewPriorityQueue()
 | 
			
		||||
	}
 | 
			
		||||
	return NewFIFO()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// FIFO is basically a simple wrapper around cache.FIFO to make it compatible
 | 
			
		||||
// with the SchedulingQueue interface.
 | 
			
		||||
type FIFO struct {
 | 
			
		||||
	*cache.FIFO
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue.
 | 
			
		||||
 | 
			
		||||
func (f *FIFO) Add(pod *v1.Pod) error {
 | 
			
		||||
	return f.FIFO.Add(pod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error {
 | 
			
		||||
	return f.FIFO.AddIfNotPresent(pod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In
 | 
			
		||||
// FIFO it is added to the end of the queue.
 | 
			
		||||
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
 | 
			
		||||
	return f.FIFO.AddIfNotPresent(pod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *FIFO) Update(pod *v1.Pod) error {
 | 
			
		||||
	return f.FIFO.Update(pod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *FIFO) Delete(pod *v1.Pod) error {
 | 
			
		||||
	return f.FIFO.Delete(pod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Pop removes the head of FIFO and returns it.
 | 
			
		||||
// This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler
 | 
			
		||||
// has always been using. There is a comment in that file saying that this method
 | 
			
		||||
// shouldn't be used in production code, but scheduler has always been using it.
 | 
			
		||||
// This function does minimal error checking.
 | 
			
		||||
func (f *FIFO) Pop() (interface{}, error) {
 | 
			
		||||
func (f *FIFO) Pop() (*v1.Pod, error) {
 | 
			
		||||
	var result interface{}
 | 
			
		||||
	f.FIFO.Pop(func(obj interface{}) error {
 | 
			
		||||
		result = obj
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
	return result, nil
 | 
			
		||||
	return result.(*v1.Pod), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue.
 | 
			
		||||
// FIFO does not need to react to events, as all pods are always in the active
 | 
			
		||||
// scheduling queue anyway.
 | 
			
		||||
func (f *FIFO) AssignedPodAdded(pod *v1.Pod)   {}
 | 
			
		||||
func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {}
 | 
			
		||||
 | 
			
		||||
// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue.
 | 
			
		||||
func (f *FIFO) MoveAllToActiveQueue() {}
 | 
			
		||||
 | 
			
		||||
// WaitingPodsForNode returns pods that are nominated to run on the given node,
 | 
			
		||||
// but FIFO does not support it.
 | 
			
		||||
func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewFIFO() *FIFO {
 | 
			
		||||
	return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UnschedulablePods is an interface for a queue that is used to keep unschedulable
 | 
			
		||||
// pods. These pods are not actively reevaluated for scheduling. They are moved
 | 
			
		||||
// to the active scheduling queue on certain events, such as termination of a pod
 | 
			
		||||
// in the cluster, addition of nodes, etc.
 | 
			
		||||
type UnschedulablePods interface {
 | 
			
		||||
	Add(pod *v1.Pod)
 | 
			
		||||
	Delete(pod *v1.Pod)
 | 
			
		||||
	Update(pod *v1.Pod)
 | 
			
		||||
	GetPodsWaitingForNode(nodeName string) []*v1.Pod
 | 
			
		||||
	Get(pod *v1.Pod) *v1.Pod
 | 
			
		||||
	Clear()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PriorityQueue implements a scheduling queue. It is an alternative to FIFO.
 | 
			
		||||
// The head of PriorityQueue is the highest priority pending pod. This structure
 | 
			
		||||
// has two sub queues. One sub-queue holds pods that are being considered for
 | 
			
		||||
// scheduling. This is called activeQ and is a Heap. Another queue holds
 | 
			
		||||
// pods that are already tried and are determined to be unschedulable. The latter
 | 
			
		||||
// is called unschedulableQ.
 | 
			
		||||
// Heap is already thread safe, but we need to acquire another lock here to ensure
 | 
			
		||||
// atomicity of operations on the two data structures..
 | 
			
		||||
type PriorityQueue struct {
 | 
			
		||||
	lock sync.RWMutex
 | 
			
		||||
	cond sync.Cond
 | 
			
		||||
 | 
			
		||||
	// activeQ is heap structure that scheduler actively looks at to find pods to
 | 
			
		||||
	// schedule. Head of heap is the highest priority pod.
 | 
			
		||||
	activeQ *Heap
 | 
			
		||||
	// unschedulableQ holds pods that have been tried and determined unschedulable.
 | 
			
		||||
	unschedulableQ *UnschedulablePodsMap
 | 
			
		||||
	// receivedMoveRequest is set to true whenever we receive a request to move a
 | 
			
		||||
	// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
 | 
			
		||||
	// a pod from the activeQ. It indicates if we received a move request when a
 | 
			
		||||
	// pod was in flight (we were trying to schedule it). In such a case, we put
 | 
			
		||||
	// the pod back into the activeQ if it is determined unschedulable.
 | 
			
		||||
	receivedMoveRequest bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Making sure that PriorityQueue implements SchedulingQueue.
 | 
			
		||||
var _ = SchedulingQueue(&PriorityQueue{})
 | 
			
		||||
 | 
			
		||||
func NewPriorityQueue() *PriorityQueue {
 | 
			
		||||
	pq := &PriorityQueue{
 | 
			
		||||
		activeQ:        newHeap(cache.MetaNamespaceKeyFunc, util.HigherPriorityPod),
 | 
			
		||||
		unschedulableQ: newUnschedulablePodsMap(),
 | 
			
		||||
	}
 | 
			
		||||
	pq.cond.L = &pq.lock
 | 
			
		||||
	return pq
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add adds a pod to the active queue. It should be called only when a new pod
 | 
			
		||||
// is added so there is no chance the pod is already in either queue.
 | 
			
		||||
func (p *PriorityQueue) Add(pod *v1.Pod) error {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	defer p.lock.Unlock()
 | 
			
		||||
	err := p.activeQ.Add(pod)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
 | 
			
		||||
	} else {
 | 
			
		||||
		if p.unschedulableQ.Get(pod) != nil {
 | 
			
		||||
			glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name)
 | 
			
		||||
			p.unschedulableQ.Delete(pod)
 | 
			
		||||
		}
 | 
			
		||||
		p.cond.Broadcast()
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddIfNotPresent adds a pod to the active queue if it is not present in any of
 | 
			
		||||
// the two queues. If it is present in any, it doesn't do any thing.
 | 
			
		||||
func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	defer p.lock.Unlock()
 | 
			
		||||
	if p.unschedulableQ.Get(pod) != nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	if _, exists, _ := p.activeQ.Get(pod); exists {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	err := p.activeQ.Add(pod)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
 | 
			
		||||
	} else {
 | 
			
		||||
		p.cond.Broadcast()
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddUnschedulableIfNotPresent does nothing if the pod is present in either
 | 
			
		||||
// queue. Otherwise it adds the pod to the unschedulable queue if
 | 
			
		||||
// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.
 | 
			
		||||
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	defer p.lock.Unlock()
 | 
			
		||||
	if p.unschedulableQ.Get(pod) != nil {
 | 
			
		||||
		return fmt.Errorf("pod is already present in unschedulableQ")
 | 
			
		||||
	}
 | 
			
		||||
	if _, exists, _ := p.activeQ.Get(pod); exists {
 | 
			
		||||
		return fmt.Errorf("pod is already present in the activeQ")
 | 
			
		||||
	}
 | 
			
		||||
	if p.receivedMoveRequest {
 | 
			
		||||
		return p.activeQ.Add(pod)
 | 
			
		||||
	}
 | 
			
		||||
	p.unschedulableQ.Add(pod)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Pop removes the head of the active queue and returns it. It blocks if the
 | 
			
		||||
// activeQ is empty and waits until a new item is added to the queue. It also
 | 
			
		||||
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.
 | 
			
		||||
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	defer p.lock.Unlock()
 | 
			
		||||
	for len(p.activeQ.data.queue) == 0 {
 | 
			
		||||
		p.cond.Wait()
 | 
			
		||||
	}
 | 
			
		||||
	obj, err := p.activeQ.Pop()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	p.receivedMoveRequest = false
 | 
			
		||||
	return obj.(*v1.Pod), err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// isPodUpdated checks if the pod is updated in a way that it may have become
 | 
			
		||||
// schedulable. It drops status of the pod and compares it with old version.
 | 
			
		||||
func isPodUpdated(oldPod, newPod *v1.Pod) bool {
 | 
			
		||||
	strip := func(pod *v1.Pod) *v1.Pod {
 | 
			
		||||
		p := pod.DeepCopy()
 | 
			
		||||
		p.ResourceVersion = ""
 | 
			
		||||
		p.Status = v1.PodStatus{}
 | 
			
		||||
		return p
 | 
			
		||||
	}
 | 
			
		||||
	return !reflect.DeepEqual(strip(oldPod), strip(newPod))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Update updates a pod in the active queue if present. Otherwise, it removes
 | 
			
		||||
// the item from the unschedulable queue and adds the updated one to the active
 | 
			
		||||
// queue.
 | 
			
		||||
func (p *PriorityQueue) Update(pod *v1.Pod) error {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	defer p.lock.Unlock()
 | 
			
		||||
	// If the pod is already in the active queue, just update it there.
 | 
			
		||||
	if _, exists, _ := p.activeQ.Get(pod); exists {
 | 
			
		||||
		err := p.activeQ.Update(pod)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			p.cond.Broadcast()
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	// If the pod is in the unschedulable queue, updating it may make it schedulable.
 | 
			
		||||
	if oldPod := p.unschedulableQ.Get(pod); oldPod != nil {
 | 
			
		||||
		if isPodUpdated(oldPod, pod) {
 | 
			
		||||
			p.unschedulableQ.Delete(pod)
 | 
			
		||||
			err := p.activeQ.Add(pod)
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				p.cond.Broadcast()
 | 
			
		||||
			}
 | 
			
		||||
			return err
 | 
			
		||||
		} else {
 | 
			
		||||
			p.unschedulableQ.Update(pod)
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	// If pod is not in any of the two queue, we put it in the active queue.
 | 
			
		||||
	err := p.activeQ.Add(pod)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		p.cond.Broadcast()
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Delete deletes the item from either of the two queues. It assumes the pod is
 | 
			
		||||
// only in one queue.
 | 
			
		||||
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	defer p.lock.Unlock()
 | 
			
		||||
	if _, exists, _ := p.activeQ.Get(pod); exists {
 | 
			
		||||
		return p.activeQ.Delete(pod)
 | 
			
		||||
	}
 | 
			
		||||
	p.unschedulableQ.Delete(pod)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
 | 
			
		||||
// may make pending pods with matching affinity terms schedulable.
 | 
			
		||||
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
 | 
			
		||||
	p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AssignedPodUpdated is called when a bound pod is updated. Change of labels
 | 
			
		||||
// may make pending pods with matching affinity terms schedulable.
 | 
			
		||||
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
 | 
			
		||||
	p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
 | 
			
		||||
// function adds all pods and then signals the condition variable to ensure that
 | 
			
		||||
// if Pop() is waiting for an item, it receives it after all the pods are in the
 | 
			
		||||
// queue and the head is the highest priority pod.
 | 
			
		||||
// TODO(bsalamat): We should add a back-off mechanism here so that a high priority
 | 
			
		||||
// pod which is unschedulable does not go to the head of the queue frequently. For
 | 
			
		||||
// example in a cluster where a lot of pods being deleted, such a high priority
 | 
			
		||||
// pod can deprive other pods from getting scheduled.
 | 
			
		||||
func (p *PriorityQueue) MoveAllToActiveQueue() {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	defer p.lock.Unlock()
 | 
			
		||||
	var unschedulablePods []interface{}
 | 
			
		||||
	for _, pod := range p.unschedulableQ.pods {
 | 
			
		||||
		unschedulablePods = append(unschedulablePods, pod)
 | 
			
		||||
	}
 | 
			
		||||
	p.activeQ.BulkAdd(unschedulablePods)
 | 
			
		||||
	p.unschedulableQ.Clear()
 | 
			
		||||
	p.receivedMoveRequest = true
 | 
			
		||||
	p.cond.Broadcast()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	defer p.lock.Unlock()
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		p.activeQ.Add(pod)
 | 
			
		||||
		p.unschedulableQ.Delete(pod)
 | 
			
		||||
	}
 | 
			
		||||
	p.receivedMoveRequest = true
 | 
			
		||||
	p.cond.Broadcast()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
 | 
			
		||||
// any affinity term that matches "pod".
 | 
			
		||||
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
 | 
			
		||||
	p.lock.RLock()
 | 
			
		||||
	defer p.lock.RUnlock()
 | 
			
		||||
	podsToMove := []*v1.Pod{}
 | 
			
		||||
	for _, up := range p.unschedulableQ.pods {
 | 
			
		||||
		affinity := up.Spec.Affinity
 | 
			
		||||
		if affinity != nil && affinity.PodAffinity != nil {
 | 
			
		||||
			terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
 | 
			
		||||
			for _, term := range terms {
 | 
			
		||||
				namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up, &term)
 | 
			
		||||
				selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					glog.Errorf("Error getting label selectors for pod: %v.", up.Name)
 | 
			
		||||
				}
 | 
			
		||||
				if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
 | 
			
		||||
					podsToMove = append(podsToMove, up)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return podsToMove
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitingPodsForNode returns pods that are nominated to run on the given node,
 | 
			
		||||
// but they are waiting for other pods to be removed from the node before they
 | 
			
		||||
// can be actually scheduled.
 | 
			
		||||
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
 | 
			
		||||
	return p.unschedulableQ.GetPodsWaitingForNode(nodeName)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
 | 
			
		||||
// is used to implement unschedulableQ.
 | 
			
		||||
type UnschedulablePodsMap struct {
 | 
			
		||||
	// pods is a map key by a pod's full-name and the value is a pointer to the pod.
 | 
			
		||||
	pods map[string]*v1.Pod
 | 
			
		||||
	// nominatedPods is a map keyed by a node name and the value is a list of
 | 
			
		||||
	// pods' full-names which are nominated to run on the node.
 | 
			
		||||
	nominatedPods map[string][]string
 | 
			
		||||
	keyFunc       func(*v1.Pod) string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ = UnschedulablePods(&UnschedulablePodsMap{})
 | 
			
		||||
 | 
			
		||||
func NominatedNodeName(pod *v1.Pod) string {
 | 
			
		||||
	nominatedNodeName, ok := pod.Annotations[NominatedNodeAnnotationKey]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return ""
 | 
			
		||||
	}
 | 
			
		||||
	return nominatedNodeName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add adds a pod to the unschedulable pods.
 | 
			
		||||
func (u *UnschedulablePodsMap) Add(pod *v1.Pod) {
 | 
			
		||||
	podKey := u.keyFunc(pod)
 | 
			
		||||
	if _, exists := u.pods[podKey]; !exists {
 | 
			
		||||
		u.pods[podKey] = pod
 | 
			
		||||
		nominatedNodeName := NominatedNodeName(pod)
 | 
			
		||||
		if len(nominatedNodeName) > 0 {
 | 
			
		||||
			u.nominatedPods[nominatedNodeName] = append(u.nominatedPods[nominatedNodeName], podKey)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (u *UnschedulablePodsMap) deleteFromNominated(pod *v1.Pod) {
 | 
			
		||||
	nominatedNodeName := NominatedNodeName(pod)
 | 
			
		||||
	if len(nominatedNodeName) > 0 {
 | 
			
		||||
		podKey := u.keyFunc(pod)
 | 
			
		||||
		nps := u.nominatedPods[nominatedNodeName]
 | 
			
		||||
		for i, np := range nps {
 | 
			
		||||
			if np == podKey {
 | 
			
		||||
				u.nominatedPods[nominatedNodeName] = append(nps[:i], nps[i+1:]...)
 | 
			
		||||
				if len(u.nominatedPods[nominatedNodeName]) == 0 {
 | 
			
		||||
					delete(u.nominatedPods, nominatedNodeName)
 | 
			
		||||
				}
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Delete deletes a pod from the unschedulable pods.
 | 
			
		||||
func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) {
 | 
			
		||||
	podKey := u.keyFunc(pod)
 | 
			
		||||
	if p, exists := u.pods[podKey]; exists {
 | 
			
		||||
		u.deleteFromNominated(p)
 | 
			
		||||
		delete(u.pods, podKey)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Update updates a pod in the unschedulable pods.
 | 
			
		||||
func (u *UnschedulablePodsMap) Update(pod *v1.Pod) {
 | 
			
		||||
	podKey := u.keyFunc(pod)
 | 
			
		||||
	oldPod, exists := u.pods[podKey]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		u.Add(pod)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	u.pods[podKey] = pod
 | 
			
		||||
	oldNominateNodeName := NominatedNodeName(oldPod)
 | 
			
		||||
	nominatedNodeName := NominatedNodeName(pod)
 | 
			
		||||
	if oldNominateNodeName != nominatedNodeName {
 | 
			
		||||
		u.deleteFromNominated(oldPod)
 | 
			
		||||
		if len(nominatedNodeName) > 0 {
 | 
			
		||||
			u.nominatedPods[nominatedNodeName] = append(u.nominatedPods[nominatedNodeName], podKey)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get returns the pod if a pod with the same key as the key of the given "pod"
 | 
			
		||||
// is found in the map. It returns nil otherwise.
 | 
			
		||||
func (u *UnschedulablePodsMap) Get(pod *v1.Pod) *v1.Pod {
 | 
			
		||||
	podKey := u.keyFunc(pod)
 | 
			
		||||
	if p, exists := u.pods[podKey]; exists {
 | 
			
		||||
		return p
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetPodsWaitingForNode returns a list of unschedulable pods whose NominatedNodeNames
 | 
			
		||||
// are equal to the given nodeName.
 | 
			
		||||
func (u *UnschedulablePodsMap) GetPodsWaitingForNode(nodeName string) []*v1.Pod {
 | 
			
		||||
	var pods []*v1.Pod
 | 
			
		||||
	for _, key := range u.nominatedPods[nodeName] {
 | 
			
		||||
		pods = append(pods, u.pods[key])
 | 
			
		||||
	}
 | 
			
		||||
	return pods
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Clear removes all the entries from the unschedulable maps.
 | 
			
		||||
func (u *UnschedulablePodsMap) Clear() {
 | 
			
		||||
	u.pods = make(map[string]*v1.Pod)
 | 
			
		||||
	u.nominatedPods = make(map[string][]string)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
 | 
			
		||||
func newUnschedulablePodsMap() *UnschedulablePodsMap {
 | 
			
		||||
	return &UnschedulablePodsMap{
 | 
			
		||||
		pods:          make(map[string]*v1.Pod),
 | 
			
		||||
		nominatedPods: make(map[string][]string),
 | 
			
		||||
		keyFunc:       util.GetPodFullName,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Below is the implementation of the a heap. The logic is pretty much the same
 | 
			
		||||
// as cache.heap, however, this heap does not perform synchronization. It leaves
 | 
			
		||||
// synchronization to the SchedulingQueue.
 | 
			
		||||
 | 
			
		||||
type LessFunc func(interface{}, interface{}) bool
 | 
			
		||||
type KeyFunc func(obj interface{}) (string, error)
 | 
			
		||||
 | 
			
		||||
type heapItem struct {
 | 
			
		||||
	obj   interface{} // The object which is stored in the heap.
 | 
			
		||||
	index int         // The index of the object's key in the Heap.queue.
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type itemKeyValue struct {
 | 
			
		||||
	key string
 | 
			
		||||
	obj interface{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// heapData is an internal struct that implements the standard heap interface
 | 
			
		||||
// and keeps the data stored in the heap.
 | 
			
		||||
type heapData struct {
 | 
			
		||||
	// items is a map from key of the objects to the objects and their index.
 | 
			
		||||
	// We depend on the property that items in the map are in the queue and vice versa.
 | 
			
		||||
	items map[string]*heapItem
 | 
			
		||||
	// queue implements a heap data structure and keeps the order of elements
 | 
			
		||||
	// according to the heap invariant. The queue keeps the keys of objects stored
 | 
			
		||||
	// in "items".
 | 
			
		||||
	queue []string
 | 
			
		||||
 | 
			
		||||
	// keyFunc is used to make the key used for queued item insertion and retrieval, and
 | 
			
		||||
	// should be deterministic.
 | 
			
		||||
	keyFunc KeyFunc
 | 
			
		||||
	// lessFunc is used to compare two objects in the heap.
 | 
			
		||||
	lessFunc LessFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	_ = heap.Interface(&heapData{}) // heapData is a standard heap
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Less compares two objects and returns true if the first one should go
 | 
			
		||||
// in front of the second one in the heap.
 | 
			
		||||
func (h *heapData) Less(i, j int) bool {
 | 
			
		||||
	if i > len(h.queue) || j > len(h.queue) {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	itemi, ok := h.items[h.queue[i]]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	itemj, ok := h.items[h.queue[j]]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	return h.lessFunc(itemi.obj, itemj.obj)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Len returns the number of items in the Heap.
 | 
			
		||||
func (h *heapData) Len() int { return len(h.queue) }
 | 
			
		||||
 | 
			
		||||
// Swap implements swapping of two elements in the heap. This is a part of standard
 | 
			
		||||
// heap interface and should never be called directly.
 | 
			
		||||
func (h *heapData) Swap(i, j int) {
 | 
			
		||||
	h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
 | 
			
		||||
	item := h.items[h.queue[i]]
 | 
			
		||||
	item.index = i
 | 
			
		||||
	item = h.items[h.queue[j]]
 | 
			
		||||
	item.index = j
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Push is supposed to be called by heap.Push only.
 | 
			
		||||
func (h *heapData) Push(kv interface{}) {
 | 
			
		||||
	keyValue := kv.(*itemKeyValue)
 | 
			
		||||
	n := len(h.queue)
 | 
			
		||||
	h.items[keyValue.key] = &heapItem{keyValue.obj, n}
 | 
			
		||||
	h.queue = append(h.queue, keyValue.key)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Pop is supposed to be called by heap.Pop only.
 | 
			
		||||
func (h *heapData) Pop() interface{} {
 | 
			
		||||
	key := h.queue[len(h.queue)-1]
 | 
			
		||||
	h.queue = h.queue[0 : len(h.queue)-1]
 | 
			
		||||
	item, ok := h.items[key]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		// This is an error
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	delete(h.items, key)
 | 
			
		||||
	return item.obj
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
 | 
			
		||||
// It can be used to implement priority queues and similar data structures.
 | 
			
		||||
type Heap struct {
 | 
			
		||||
	// data stores objects and has a queue that keeps their ordering according
 | 
			
		||||
	// to the heap invariant.
 | 
			
		||||
	data *heapData
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add inserts an item, and puts it in the queue. The item is updated if it
 | 
			
		||||
// already exists.
 | 
			
		||||
func (h *Heap) Add(obj interface{}) error {
 | 
			
		||||
	key, err := h.data.keyFunc(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return cache.KeyError{Obj: obj, Err: err}
 | 
			
		||||
	}
 | 
			
		||||
	if _, exists := h.data.items[key]; exists {
 | 
			
		||||
		h.data.items[key].obj = obj
 | 
			
		||||
		heap.Fix(h.data, h.data.items[key].index)
 | 
			
		||||
	} else {
 | 
			
		||||
		heap.Push(h.data, &itemKeyValue{key, obj})
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BulkAdd adds all the items in the list to the queue.
 | 
			
		||||
func (h *Heap) BulkAdd(list []interface{}) error {
 | 
			
		||||
	for _, obj := range list {
 | 
			
		||||
		key, err := h.data.keyFunc(obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return cache.KeyError{Obj: obj, Err: err}
 | 
			
		||||
		}
 | 
			
		||||
		if _, exists := h.data.items[key]; exists {
 | 
			
		||||
			h.data.items[key].obj = obj
 | 
			
		||||
			heap.Fix(h.data, h.data.items[key].index)
 | 
			
		||||
		} else {
 | 
			
		||||
			heap.Push(h.data, &itemKeyValue{key, obj})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
 | 
			
		||||
// the key is present in the map, no changes is made to the item.
 | 
			
		||||
func (h *Heap) AddIfNotPresent(obj interface{}) error {
 | 
			
		||||
	key, err := h.data.keyFunc(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return cache.KeyError{Obj: obj, Err: err}
 | 
			
		||||
	}
 | 
			
		||||
	if _, exists := h.data.items[key]; !exists {
 | 
			
		||||
		heap.Push(h.data, &itemKeyValue{key, obj})
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Update is the same as Add in this implementation. When the item does not
 | 
			
		||||
// exist, it is added.
 | 
			
		||||
func (h *Heap) Update(obj interface{}) error {
 | 
			
		||||
	return h.Add(obj)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Delete removes an item.
 | 
			
		||||
func (h *Heap) Delete(obj interface{}) error {
 | 
			
		||||
	key, err := h.data.keyFunc(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return cache.KeyError{Obj: obj, Err: err}
 | 
			
		||||
	}
 | 
			
		||||
	if item, ok := h.data.items[key]; ok {
 | 
			
		||||
		heap.Remove(h.data, item.index)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return fmt.Errorf("object not found")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Pop returns the head of the heap.
 | 
			
		||||
func (h *Heap) Pop() (interface{}, error) {
 | 
			
		||||
	obj := heap.Pop(h.data)
 | 
			
		||||
	if obj != nil {
 | 
			
		||||
		return obj, nil
 | 
			
		||||
	} else {
 | 
			
		||||
		return nil, fmt.Errorf("object was removed from heap data")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get returns the requested item, or sets exists=false.
 | 
			
		||||
func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
 | 
			
		||||
	key, err := h.data.keyFunc(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, false, cache.KeyError{Obj: obj, Err: err}
 | 
			
		||||
	}
 | 
			
		||||
	return h.GetByKey(key)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetByKey returns the requested item, or sets exists=false.
 | 
			
		||||
func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
 | 
			
		||||
	item, exists := h.data.items[key]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		return nil, false, nil
 | 
			
		||||
	}
 | 
			
		||||
	return item.obj, true, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// List returns a list of all the items.
 | 
			
		||||
func (h *Heap) List() []interface{} {
 | 
			
		||||
	list := make([]interface{}, 0, len(h.data.items))
 | 
			
		||||
	for _, item := range h.data.items {
 | 
			
		||||
		list = append(list, item.obj)
 | 
			
		||||
	}
 | 
			
		||||
	return list
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newHeap returns a Heap which can be used to queue up items to process.
 | 
			
		||||
func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
 | 
			
		||||
	return &Heap{
 | 
			
		||||
		data: &heapData{
 | 
			
		||||
			items:    map[string]*heapItem{},
 | 
			
		||||
			queue:    []string{},
 | 
			
		||||
			keyFunc:  keyFn,
 | 
			
		||||
			lessFunc: lessFn,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										389
									
								
								plugin/pkg/scheduler/core/scheduling_queue_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										389
									
								
								plugin/pkg/scheduler/core/scheduling_queue_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,389 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2017 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package core
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/kubernetes/plugin/pkg/scheduler/util"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var mediumPriority = (lowPriority + highPriority) / 2
 | 
			
		||||
var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{
 | 
			
		||||
	ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
		Name:      "hpp",
 | 
			
		||||
		Namespace: "ns1",
 | 
			
		||||
	},
 | 
			
		||||
	Spec: v1.PodSpec{
 | 
			
		||||
		Priority: &highPriority,
 | 
			
		||||
	},
 | 
			
		||||
},
 | 
			
		||||
	v1.Pod{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name:      "mpp",
 | 
			
		||||
			Namespace: "ns2",
 | 
			
		||||
			Annotations: map[string]string{
 | 
			
		||||
				NominatedNodeAnnotationKey: "node1", "annot2": "val2",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PodSpec{
 | 
			
		||||
			Priority: &mediumPriority,
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	v1.Pod{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name:      "up",
 | 
			
		||||
			Namespace: "ns1",
 | 
			
		||||
			Annotations: map[string]string{
 | 
			
		||||
				NominatedNodeAnnotationKey: "node1", "annot2": "val2",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PodSpec{
 | 
			
		||||
			Priority: &lowPriority,
 | 
			
		||||
		},
 | 
			
		||||
		Status: v1.PodStatus{
 | 
			
		||||
			Conditions: []v1.PodCondition{
 | 
			
		||||
				{
 | 
			
		||||
					Type:   v1.PodScheduled,
 | 
			
		||||
					Status: v1.ConditionFalse,
 | 
			
		||||
					Reason: v1.PodReasonUnschedulable,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
func TestPriorityQueue_Add(t *testing.T) {
 | 
			
		||||
	q := NewPriorityQueue()
 | 
			
		||||
	q.Add(&medPriorityPod)
 | 
			
		||||
	q.Add(&unschedulablePod)
 | 
			
		||||
	q.Add(&highPriorityPod)
 | 
			
		||||
	if p, err := q.Pop(); err != nil || p != &highPriorityPod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
 | 
			
		||||
	}
 | 
			
		||||
	if p, err := q.Pop(); err != nil || p != &medPriorityPod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
 | 
			
		||||
	}
 | 
			
		||||
	if p, err := q.Pop(); err != nil || p != &unschedulablePod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPriorityQueue_Pop(t *testing.T) {
 | 
			
		||||
	q := NewPriorityQueue()
 | 
			
		||||
	go func() {
 | 
			
		||||
		if p, err := q.Pop(); err != nil || p != &highPriorityPod {
 | 
			
		||||
			t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	q.Add(&highPriorityPod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPriorityQueue_Update(t *testing.T) {
 | 
			
		||||
	q := NewPriorityQueue()
 | 
			
		||||
	q.Update(&highPriorityPod)
 | 
			
		||||
	if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists {
 | 
			
		||||
		t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	q.Update(&highPriorityPod)
 | 
			
		||||
	if q.activeQ.data.Len() != 1 {
 | 
			
		||||
		t.Error("Expected only one item in activeQ.")
 | 
			
		||||
	}
 | 
			
		||||
	// Updating an unschedulable pod which is not in any of the two queues, should
 | 
			
		||||
	// add the pod to activeQ.
 | 
			
		||||
	q.Update(&unschedulablePod)
 | 
			
		||||
	if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
 | 
			
		||||
		t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	// Updating a pod that is already in unschedulableQ, should move the pod to
 | 
			
		||||
	// activeQ.
 | 
			
		||||
	q.Update(&unschedulablePod)
 | 
			
		||||
	if len(q.unschedulableQ.pods) != 0 {
 | 
			
		||||
		t.Error("Expected unschedulableQ to be empty.")
 | 
			
		||||
	}
 | 
			
		||||
	if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
 | 
			
		||||
		t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	if p, err := q.Pop(); err != nil || p != &highPriorityPod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPriorityQueue_Delete(t *testing.T) {
 | 
			
		||||
	q := NewPriorityQueue()
 | 
			
		||||
	q.Update(&highPriorityPod)
 | 
			
		||||
	q.Add(&unschedulablePod)
 | 
			
		||||
	q.Delete(&highPriorityPod)
 | 
			
		||||
	if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
 | 
			
		||||
		t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
 | 
			
		||||
	}
 | 
			
		||||
	if _, exists, _ := q.activeQ.Get(&highPriorityPod); exists {
 | 
			
		||||
		t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
 | 
			
		||||
	q := NewPriorityQueue()
 | 
			
		||||
	q.Add(&medPriorityPod)
 | 
			
		||||
	q.unschedulableQ.Add(&unschedulablePod)
 | 
			
		||||
	q.unschedulableQ.Add(&highPriorityPod)
 | 
			
		||||
	q.MoveAllToActiveQueue()
 | 
			
		||||
	if q.activeQ.data.Len() != 3 {
 | 
			
		||||
		t.Error("Expected all items to be in activeQ.")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that
 | 
			
		||||
// when a pod with pod affinity is in unschedulableQ and another pod with a
 | 
			
		||||
// matching label is added, the unschedulable pod is moved to activeQ.
 | 
			
		||||
func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
 | 
			
		||||
	affinityPod := unschedulablePod.DeepCopy()
 | 
			
		||||
	affinityPod.Name = "afp"
 | 
			
		||||
	affinityPod.Spec = v1.PodSpec{
 | 
			
		||||
		Affinity: &v1.Affinity{
 | 
			
		||||
			PodAffinity: &v1.PodAffinity{
 | 
			
		||||
				RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
 | 
			
		||||
					{
 | 
			
		||||
						LabelSelector: &metav1.LabelSelector{
 | 
			
		||||
							MatchExpressions: []metav1.LabelSelectorRequirement{
 | 
			
		||||
								{
 | 
			
		||||
									Key:      "service",
 | 
			
		||||
									Operator: metav1.LabelSelectorOpIn,
 | 
			
		||||
									Values:   []string{"securityscan", "value2"},
 | 
			
		||||
								},
 | 
			
		||||
							},
 | 
			
		||||
						},
 | 
			
		||||
						TopologyKey: "region",
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		Priority: &mediumPriority,
 | 
			
		||||
	}
 | 
			
		||||
	labelPod := v1.Pod{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name:      "lbp",
 | 
			
		||||
			Namespace: affinityPod.Namespace,
 | 
			
		||||
			Labels:    map[string]string{"service": "securityscan"},
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PodSpec{NodeName: "machine1"},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	q := NewPriorityQueue()
 | 
			
		||||
	q.Add(&medPriorityPod)
 | 
			
		||||
	// Add a couple of pods to the unschedulableQ.
 | 
			
		||||
	q.unschedulableQ.Add(&unschedulablePod)
 | 
			
		||||
	q.unschedulableQ.Add(affinityPod)
 | 
			
		||||
	// Simulate addition of an assigned pod. The pod has matching labels for
 | 
			
		||||
	// affinityPod. So, affinityPod should go to activeQ.
 | 
			
		||||
	q.AssignedPodAdded(&labelPod)
 | 
			
		||||
	if q.unschedulableQ.Get(affinityPod) != nil {
 | 
			
		||||
		t.Error("affinityPod is still in the unschedulableQ.")
 | 
			
		||||
	}
 | 
			
		||||
	if _, exists, _ := q.activeQ.Get(affinityPod); !exists {
 | 
			
		||||
		t.Error("affinityPod is not moved to activeQ.")
 | 
			
		||||
	}
 | 
			
		||||
	// Check that the other pod is still in the unschedulableQ.
 | 
			
		||||
	if q.unschedulableQ.Get(&unschedulablePod) == nil {
 | 
			
		||||
		t.Error("unschedulablePod is not in the unschedulableQ.")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestUnschedulablePodsMap(t *testing.T) {
 | 
			
		||||
	var pods = []*v1.Pod{
 | 
			
		||||
		{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name:      "p0",
 | 
			
		||||
				Namespace: "ns1",
 | 
			
		||||
				Annotations: map[string]string{
 | 
			
		||||
					NominatedNodeAnnotationKey: "node1", "annot2": "val2",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name:      "p1",
 | 
			
		||||
				Namespace: "ns1",
 | 
			
		||||
				Annotations: map[string]string{
 | 
			
		||||
					"annot": "val",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name:      "p2",
 | 
			
		||||
				Namespace: "ns2",
 | 
			
		||||
				Annotations: map[string]string{
 | 
			
		||||
					NominatedNodeAnnotationKey: "node3", "annot2": "val2", "annot3": "val3",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name:      "p3",
 | 
			
		||||
				Namespace: "ns4",
 | 
			
		||||
				Annotations: map[string]string{
 | 
			
		||||
					NominatedNodeAnnotationKey: "node1",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	var updatedPods = make([]*v1.Pod, len(pods))
 | 
			
		||||
	updatedPods[0] = pods[0].DeepCopy()
 | 
			
		||||
	updatedPods[0].Annotations[NominatedNodeAnnotationKey] = "node3"
 | 
			
		||||
	updatedPods[1] = pods[1].DeepCopy()
 | 
			
		||||
	updatedPods[1].Annotations[NominatedNodeAnnotationKey] = "node3"
 | 
			
		||||
	updatedPods[3] = pods[3].DeepCopy()
 | 
			
		||||
	delete(updatedPods[3].Annotations, NominatedNodeAnnotationKey)
 | 
			
		||||
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		podsToAdd                    []*v1.Pod
 | 
			
		||||
		expectedMapAfterAdd          map[string]*v1.Pod
 | 
			
		||||
		expectedNominatedAfterAdd    map[string][]string
 | 
			
		||||
		podsToUpdate                 []*v1.Pod
 | 
			
		||||
		expectedMapAfterUpdate       map[string]*v1.Pod
 | 
			
		||||
		expectedNominatedAfterUpdate map[string][]string
 | 
			
		||||
		podsToDelete                 []*v1.Pod
 | 
			
		||||
		expectedMapAfterDelete       map[string]*v1.Pod
 | 
			
		||||
		expectedNominatedAfterDelete map[string][]string
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]},
 | 
			
		||||
			expectedMapAfterAdd: map[string]*v1.Pod{
 | 
			
		||||
				util.GetPodFullName(pods[0]): pods[0],
 | 
			
		||||
				util.GetPodFullName(pods[1]): pods[1],
 | 
			
		||||
				util.GetPodFullName(pods[2]): pods[2],
 | 
			
		||||
				util.GetPodFullName(pods[3]): pods[3],
 | 
			
		||||
			},
 | 
			
		||||
			expectedNominatedAfterAdd: map[string][]string{
 | 
			
		||||
				"node1": {util.GetPodFullName(pods[0]), util.GetPodFullName(pods[3])},
 | 
			
		||||
				"node3": {util.GetPodFullName(pods[2])},
 | 
			
		||||
			},
 | 
			
		||||
			podsToUpdate: []*v1.Pod{updatedPods[0]},
 | 
			
		||||
			expectedMapAfterUpdate: map[string]*v1.Pod{
 | 
			
		||||
				util.GetPodFullName(pods[0]): updatedPods[0],
 | 
			
		||||
				util.GetPodFullName(pods[1]): pods[1],
 | 
			
		||||
				util.GetPodFullName(pods[2]): pods[2],
 | 
			
		||||
				util.GetPodFullName(pods[3]): pods[3],
 | 
			
		||||
			},
 | 
			
		||||
			expectedNominatedAfterUpdate: map[string][]string{
 | 
			
		||||
				"node1": {util.GetPodFullName(pods[3])},
 | 
			
		||||
				"node3": {util.GetPodFullName(pods[2]), util.GetPodFullName(pods[0])},
 | 
			
		||||
			},
 | 
			
		||||
			podsToDelete: []*v1.Pod{pods[0], pods[1]},
 | 
			
		||||
			expectedMapAfterDelete: map[string]*v1.Pod{
 | 
			
		||||
				util.GetPodFullName(pods[2]): pods[2],
 | 
			
		||||
				util.GetPodFullName(pods[3]): pods[3],
 | 
			
		||||
			},
 | 
			
		||||
			expectedNominatedAfterDelete: map[string][]string{
 | 
			
		||||
				"node1": {util.GetPodFullName(pods[3])},
 | 
			
		||||
				"node3": {util.GetPodFullName(pods[2])},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			podsToAdd: []*v1.Pod{pods[0], pods[3]},
 | 
			
		||||
			expectedMapAfterAdd: map[string]*v1.Pod{
 | 
			
		||||
				util.GetPodFullName(pods[0]): pods[0],
 | 
			
		||||
				util.GetPodFullName(pods[3]): pods[3],
 | 
			
		||||
			},
 | 
			
		||||
			expectedNominatedAfterAdd: map[string][]string{
 | 
			
		||||
				"node1": {util.GetPodFullName(pods[0]), util.GetPodFullName(pods[3])},
 | 
			
		||||
			},
 | 
			
		||||
			podsToUpdate: []*v1.Pod{updatedPods[3]},
 | 
			
		||||
			expectedMapAfterUpdate: map[string]*v1.Pod{
 | 
			
		||||
				util.GetPodFullName(pods[0]): pods[0],
 | 
			
		||||
				util.GetPodFullName(pods[3]): updatedPods[3],
 | 
			
		||||
			},
 | 
			
		||||
			expectedNominatedAfterUpdate: map[string][]string{
 | 
			
		||||
				"node1": {util.GetPodFullName(pods[0])},
 | 
			
		||||
			},
 | 
			
		||||
			podsToDelete:                 []*v1.Pod{pods[0], pods[3]},
 | 
			
		||||
			expectedMapAfterDelete:       map[string]*v1.Pod{},
 | 
			
		||||
			expectedNominatedAfterDelete: map[string][]string{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			podsToAdd: []*v1.Pod{pods[1], pods[2]},
 | 
			
		||||
			expectedMapAfterAdd: map[string]*v1.Pod{
 | 
			
		||||
				util.GetPodFullName(pods[1]): pods[1],
 | 
			
		||||
				util.GetPodFullName(pods[2]): pods[2],
 | 
			
		||||
			},
 | 
			
		||||
			expectedNominatedAfterAdd: map[string][]string{
 | 
			
		||||
				"node3": {util.GetPodFullName(pods[2])},
 | 
			
		||||
			},
 | 
			
		||||
			podsToUpdate: []*v1.Pod{updatedPods[1]},
 | 
			
		||||
			expectedMapAfterUpdate: map[string]*v1.Pod{
 | 
			
		||||
				util.GetPodFullName(pods[1]): updatedPods[1],
 | 
			
		||||
				util.GetPodFullName(pods[2]): pods[2],
 | 
			
		||||
			},
 | 
			
		||||
			expectedNominatedAfterUpdate: map[string][]string{
 | 
			
		||||
				"node3": {util.GetPodFullName(pods[2]), util.GetPodFullName(updatedPods[1])},
 | 
			
		||||
			},
 | 
			
		||||
			podsToDelete: []*v1.Pod{pods[2], pods[3]},
 | 
			
		||||
			expectedMapAfterDelete: map[string]*v1.Pod{
 | 
			
		||||
				util.GetPodFullName(pods[1]): updatedPods[1],
 | 
			
		||||
			},
 | 
			
		||||
			expectedNominatedAfterDelete: map[string][]string{
 | 
			
		||||
				"node3": {util.GetPodFullName(updatedPods[1])},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i, test := range tests {
 | 
			
		||||
		upm := newUnschedulablePodsMap()
 | 
			
		||||
		for _, p := range test.podsToAdd {
 | 
			
		||||
			upm.Add(p)
 | 
			
		||||
		}
 | 
			
		||||
		if !reflect.DeepEqual(upm.pods, test.expectedMapAfterAdd) {
 | 
			
		||||
			t.Errorf("#%d: Unexpected map after adding pods. Expected: %v, got: %v",
 | 
			
		||||
				i, test.expectedMapAfterAdd, upm.pods)
 | 
			
		||||
		}
 | 
			
		||||
		if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterAdd) {
 | 
			
		||||
			t.Errorf("#%d: Unexpected nominated map after adding pods. Expected: %v, got: %v",
 | 
			
		||||
				i, test.expectedNominatedAfterAdd, upm.nominatedPods)
 | 
			
		||||
		}
 | 
			
		||||
		if len(test.podsToUpdate) > 0 {
 | 
			
		||||
			for _, p := range test.podsToUpdate {
 | 
			
		||||
				upm.Update(p)
 | 
			
		||||
			}
 | 
			
		||||
			if !reflect.DeepEqual(upm.pods, test.expectedMapAfterUpdate) {
 | 
			
		||||
				t.Errorf("#%d: Unexpected map after updating pods. Expected: %v, got: %v",
 | 
			
		||||
					i, test.expectedMapAfterUpdate, upm.pods)
 | 
			
		||||
			}
 | 
			
		||||
			if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterUpdate) {
 | 
			
		||||
				t.Errorf("#%d: Unexpected nominated map after updating pods. Expected: %v, got: %v",
 | 
			
		||||
					i, test.expectedNominatedAfterUpdate, upm.nominatedPods)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		for _, p := range test.podsToDelete {
 | 
			
		||||
			upm.Delete(p)
 | 
			
		||||
		}
 | 
			
		||||
		if !reflect.DeepEqual(upm.pods, test.expectedMapAfterDelete) {
 | 
			
		||||
			t.Errorf("#%d: Unexpected map after deleting pods. Expected: %v, got: %v",
 | 
			
		||||
				i, test.expectedMapAfterDelete, upm.pods)
 | 
			
		||||
		}
 | 
			
		||||
		if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterDelete) {
 | 
			
		||||
			t.Errorf("#%d: Unexpected nominated map after deleting pods. Expected: %v, got: %v",
 | 
			
		||||
				i, test.expectedNominatedAfterDelete, upm.nominatedPods)
 | 
			
		||||
		}
 | 
			
		||||
		upm.Clear()
 | 
			
		||||
		if len(upm.pods) != 0 {
 | 
			
		||||
			t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -142,11 +142,10 @@ func NewConfigFactory(
 | 
			
		||||
	stopEverything := make(chan struct{})
 | 
			
		||||
	schedulerCache := schedulercache.New(30*time.Second, stopEverything)
 | 
			
		||||
 | 
			
		||||
	schedulingQueue := &core.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
 | 
			
		||||
	c := &configFactory{
 | 
			
		||||
		client:                         client,
 | 
			
		||||
		podLister:                      schedulerCache,
 | 
			
		||||
		podQueue:                       schedulingQueue,
 | 
			
		||||
		podQueue:                       core.NewSchedulingQueue(),
 | 
			
		||||
		pVLister:                       pvInformer.Lister(),
 | 
			
		||||
		pVCLister:                      pvcInformer.Lister(),
 | 
			
		||||
		serviceLister:                  serviceInformer.Lister(),
 | 
			
		||||
@@ -195,20 +194,21 @@ func NewConfigFactory(
 | 
			
		||||
			},
 | 
			
		||||
			Handler: cache.ResourceEventHandlerFuncs{
 | 
			
		||||
				AddFunc: func(obj interface{}) {
 | 
			
		||||
					if err := c.podQueue.Add(obj); err != nil {
 | 
			
		||||
					if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil {
 | 
			
		||||
						runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
 | 
			
		||||
					}
 | 
			
		||||
				},
 | 
			
		||||
				UpdateFunc: func(oldObj, newObj interface{}) {
 | 
			
		||||
					if c.skipPodUpdate(newObj.(*v1.Pod)) {
 | 
			
		||||
					pod := newObj.(*v1.Pod)
 | 
			
		||||
					if c.skipPodUpdate(pod) {
 | 
			
		||||
						return
 | 
			
		||||
					}
 | 
			
		||||
					if err := c.podQueue.Update(newObj); err != nil {
 | 
			
		||||
					if err := c.podQueue.Update(pod); err != nil {
 | 
			
		||||
						runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
 | 
			
		||||
					}
 | 
			
		||||
				},
 | 
			
		||||
				DeleteFunc: func(obj interface{}) {
 | 
			
		||||
					if err := c.podQueue.Delete(obj); err != nil {
 | 
			
		||||
					if err := c.podQueue.Delete(obj.(*v1.Pod)); err != nil {
 | 
			
		||||
						runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
 | 
			
		||||
					}
 | 
			
		||||
				},
 | 
			
		||||
@@ -377,6 +377,7 @@ func (c *configFactory) onPvcAdd(obj interface{}) {
 | 
			
		||||
		}
 | 
			
		||||
		c.invalidatePredicatesForPvc(pvc)
 | 
			
		||||
	}
 | 
			
		||||
	c.podQueue.MoveAllToActiveQueue()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *configFactory) onPvcDelete(obj interface{}) {
 | 
			
		||||
@@ -398,6 +399,7 @@ func (c *configFactory) onPvcDelete(obj interface{}) {
 | 
			
		||||
		}
 | 
			
		||||
		c.invalidatePredicatesForPvc(pvc)
 | 
			
		||||
	}
 | 
			
		||||
	c.podQueue.MoveAllToActiveQueue()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
 | 
			
		||||
@@ -410,6 +412,7 @@ func (c *configFactory) onServiceAdd(obj interface{}) {
 | 
			
		||||
	if c.enableEquivalenceClassCache {
 | 
			
		||||
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
 | 
			
		||||
	}
 | 
			
		||||
	c.podQueue.MoveAllToActiveQueue()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
 | 
			
		||||
@@ -421,12 +424,14 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{})
 | 
			
		||||
			c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	c.podQueue.MoveAllToActiveQueue()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *configFactory) onServiceDelete(obj interface{}) {
 | 
			
		||||
	if c.enableEquivalenceClassCache {
 | 
			
		||||
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
 | 
			
		||||
	}
 | 
			
		||||
	c.podQueue.MoveAllToActiveQueue()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
 | 
			
		||||
@@ -462,6 +467,8 @@ func (c *configFactory) addPodToCache(obj interface{}) {
 | 
			
		||||
	if err := c.schedulerCache.AddPod(pod); err != nil {
 | 
			
		||||
		glog.Errorf("scheduler cache AddPod failed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c.podQueue.AssignedPodAdded(pod)
 | 
			
		||||
	// NOTE: Updating equivalence cache of addPodToCache has been
 | 
			
		||||
	// handled optimistically in InvalidateCachedPredicateItemForPodAdd.
 | 
			
		||||
}
 | 
			
		||||
@@ -483,6 +490,7 @@ func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
 | 
			
		||||
	c.podQueue.AssignedPodUpdated(newPod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
 | 
			
		||||
@@ -527,6 +535,7 @@ func (c *configFactory) deletePodFromCache(obj interface{}) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c.invalidateCachedPredicatesOnDeletePod(pod)
 | 
			
		||||
	c.podQueue.MoveAllToActiveQueue()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
 | 
			
		||||
@@ -561,6 +570,7 @@ func (c *configFactory) addNodeToCache(obj interface{}) {
 | 
			
		||||
		glog.Errorf("scheduler cache AddNode failed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c.podQueue.MoveAllToActiveQueue()
 | 
			
		||||
	// NOTE: add a new node does not affect existing predicates in equivalence cache
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -581,6 +591,7 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode)
 | 
			
		||||
	c.podQueue.MoveAllToActiveQueue()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
 | 
			
		||||
@@ -902,8 +913,7 @@ func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *configFactory) getNextPod() *v1.Pod {
 | 
			
		||||
	if obj, err := f.podQueue.Pop(); err == nil {
 | 
			
		||||
		pod := obj.(*v1.Pod)
 | 
			
		||||
	if pod, err := f.podQueue.Pop(); err == nil {
 | 
			
		||||
		glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
 | 
			
		||||
		return pod
 | 
			
		||||
	} else {
 | 
			
		||||
 
 | 
			
		||||
@@ -23,11 +23,9 @@ import (
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	clientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	corelisters "k8s.io/client-go/listers/core/v1"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
 | 
			
		||||
	schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
 | 
			
		||||
	"k8s.io/kubernetes/plugin/pkg/scheduler/core"
 | 
			
		||||
@@ -196,7 +194,7 @@ func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
 | 
			
		||||
// If it succeeds, it adds the name of the node where preemption has happened to the pod annotations.
 | 
			
		||||
// It returns the node name and an error if any.
 | 
			
		||||
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
 | 
			
		||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) {
 | 
			
		||||
	if !util.PodPriorityEnabled() {
 | 
			
		||||
		glog.V(3).Infof("Pod priority feature is not enabled. No preemption is performed.")
 | 
			
		||||
		return "", nil
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -37,11 +37,13 @@ go_library(
 | 
			
		||||
        "//pkg/apis/core:go_default_library",
 | 
			
		||||
        "//pkg/apis/core/install:go_default_library",
 | 
			
		||||
        "//pkg/apis/scheduling:go_default_library",
 | 
			
		||||
        "//pkg/features:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/api/core/v1:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -21,7 +21,9 @@ import (
 | 
			
		||||
	"sort"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/apis/scheduling"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/features"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const DefaultBindAllHostIP = "0.0.0.0"
 | 
			
		||||
@@ -59,6 +61,11 @@ func GetUsedPorts(pods ...*v1.Pod) map[string]bool {
 | 
			
		||||
	return ports
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PodPriorityEnabled indicates whether pod priority feature is enabled.
 | 
			
		||||
func PodPriorityEnabled() bool {
 | 
			
		||||
	return feature.DefaultFeatureGate.Enabled(features.PodPriority)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetPodFullName returns a name that uniquely identifies a pod.
 | 
			
		||||
func GetPodFullName(pod *v1.Pod) string {
 | 
			
		||||
	// Use underscore as the delimiter because it is not allowed in pod name
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user