mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			407 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			407 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package scheduler
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"fmt"
 | 
						|
	"sort"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
	"k8s.io/kubernetes/pkg/api/v1"
 | 
						|
	"k8s.io/kubernetes/pkg/util"
 | 
						|
	"k8s.io/kubernetes/pkg/util/errors"
 | 
						|
	"k8s.io/kubernetes/pkg/util/workqueue"
 | 
						|
	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
 | 
						|
	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
 | 
						|
	schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
 | 
						|
	"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
 | 
						|
)
 | 
						|
 | 
						|
type FailedPredicateMap map[string][]algorithm.PredicateFailureReason
 | 
						|
 | 
						|
type FitError struct {
 | 
						|
	Pod              *v1.Pod
 | 
						|
	FailedPredicates FailedPredicateMap
 | 
						|
}
 | 
						|
 | 
						|
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
 | 
						|
 | 
						|
// Error returns detailed information of why the pod failed to fit on each node
 | 
						|
func (f *FitError) Error() string {
 | 
						|
	var buf bytes.Buffer
 | 
						|
	buf.WriteString(fmt.Sprintf("pod (%s) failed to fit in any node\n", f.Pod.Name))
 | 
						|
	reasons := make(map[string]int)
 | 
						|
	for _, predicates := range f.FailedPredicates {
 | 
						|
		for _, pred := range predicates {
 | 
						|
			reasons[pred.GetReason()] += 1
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	sortReasonsHistogram := func() []string {
 | 
						|
		reasonStrings := []string{}
 | 
						|
		for k, v := range reasons {
 | 
						|
			reasonStrings = append(reasonStrings, fmt.Sprintf("%v (%v)", k, v))
 | 
						|
		}
 | 
						|
		sort.Strings(reasonStrings)
 | 
						|
		return reasonStrings
 | 
						|
	}
 | 
						|
 | 
						|
	reasonMsg := fmt.Sprintf("fit failure summary on nodes : %v", strings.Join(sortReasonsHistogram(), ", "))
 | 
						|
	buf.WriteString(reasonMsg)
 | 
						|
	return buf.String()
 | 
						|
}
 | 
						|
 | 
						|
type genericScheduler struct {
 | 
						|
	cache                 schedulercache.Cache
 | 
						|
	predicates            map[string]algorithm.FitPredicate
 | 
						|
	priorityMetaProducer  algorithm.MetadataProducer
 | 
						|
	predicateMetaProducer algorithm.MetadataProducer
 | 
						|
	prioritizers          []algorithm.PriorityConfig
 | 
						|
	extenders             []algorithm.SchedulerExtender
 | 
						|
	pods                  algorithm.PodLister
 | 
						|
	lastNodeIndexLock     sync.Mutex
 | 
						|
	lastNodeIndex         uint64
 | 
						|
 | 
						|
	cachedNodeInfoMap map[string]*schedulercache.NodeInfo
 | 
						|
 | 
						|
	equivalenceCache *EquivalenceCache
 | 
						|
}
 | 
						|
 | 
						|
// Schedule tries to schedule the given pod to one of node in the node list.
 | 
						|
// If it succeeds, it will return the name of the node.
 | 
						|
// If it fails, it will return a Fiterror error with reasons.
 | 
						|
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
 | 
						|
	trace := util.NewTrace(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
 | 
						|
	defer trace.LogIfLong(100 * time.Millisecond)
 | 
						|
 | 
						|
	nodes, err := nodeLister.List()
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	if len(nodes) == 0 {
 | 
						|
		return "", ErrNoNodesAvailable
 | 
						|
	}
 | 
						|
 | 
						|
	// Used for all fit and priority funcs.
 | 
						|
	err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here
 | 
						|
 | 
						|
	trace.Step("Computing predicates")
 | 
						|
	filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	if len(filteredNodes) == 0 {
 | 
						|
		return "", &FitError{
 | 
						|
			Pod:              pod,
 | 
						|
			FailedPredicates: failedPredicateMap,
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	trace.Step("Prioritizing")
 | 
						|
	metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
 | 
						|
	priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	trace.Step("Selecting host")
 | 
						|
	return g.selectHost(priorityList)
 | 
						|
}
 | 
						|
 | 
						|
// selectHost takes a prioritized list of nodes and then picks one
 | 
						|
// in a round-robin manner from the nodes that had the highest score.
 | 
						|
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
 | 
						|
	if len(priorityList) == 0 {
 | 
						|
		return "", fmt.Errorf("empty priorityList")
 | 
						|
	}
 | 
						|
 | 
						|
	sort.Sort(sort.Reverse(priorityList))
 | 
						|
	maxScore := priorityList[0].Score
 | 
						|
	firstAfterMaxScore := sort.Search(len(priorityList), func(i int) bool { return priorityList[i].Score < maxScore })
 | 
						|
 | 
						|
	g.lastNodeIndexLock.Lock()
 | 
						|
	ix := int(g.lastNodeIndex % uint64(firstAfterMaxScore))
 | 
						|
	g.lastNodeIndex++
 | 
						|
	g.lastNodeIndexLock.Unlock()
 | 
						|
 | 
						|
	return priorityList[ix].Host, nil
 | 
						|
}
 | 
						|
 | 
						|
// Filters the nodes to find the ones that fit based on the given predicate functions
 | 
						|
// Each node is passed through the predicate functions to determine if it is a fit
 | 
						|
func findNodesThatFit(
 | 
						|
	pod *v1.Pod,
 | 
						|
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
 | 
						|
	nodes []*v1.Node,
 | 
						|
	predicateFuncs map[string]algorithm.FitPredicate,
 | 
						|
	extenders []algorithm.SchedulerExtender,
 | 
						|
	metadataProducer algorithm.MetadataProducer,
 | 
						|
) ([]*v1.Node, FailedPredicateMap, error) {
 | 
						|
	var filtered []*v1.Node
 | 
						|
	failedPredicateMap := FailedPredicateMap{}
 | 
						|
 | 
						|
	if len(predicateFuncs) == 0 {
 | 
						|
		filtered = nodes
 | 
						|
	} else {
 | 
						|
		// Create filtered list with enough space to avoid growing it
 | 
						|
		// and allow assigning.
 | 
						|
		filtered = make([]*v1.Node, len(nodes))
 | 
						|
		errs := []error{}
 | 
						|
		var predicateResultLock sync.Mutex
 | 
						|
		var filteredLen int32
 | 
						|
 | 
						|
		// We can use the same metadata producer for all nodes.
 | 
						|
		meta := metadataProducer(pod, nodeNameToInfo)
 | 
						|
		checkNode := func(i int) {
 | 
						|
			nodeName := nodes[i].Name
 | 
						|
			fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
 | 
						|
			if err != nil {
 | 
						|
				predicateResultLock.Lock()
 | 
						|
				errs = append(errs, err)
 | 
						|
				predicateResultLock.Unlock()
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if fits {
 | 
						|
				filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
 | 
						|
			} else {
 | 
						|
				predicateResultLock.Lock()
 | 
						|
				failedPredicateMap[nodeName] = failedPredicates
 | 
						|
				predicateResultLock.Unlock()
 | 
						|
			}
 | 
						|
		}
 | 
						|
		workqueue.Parallelize(16, len(nodes), checkNode)
 | 
						|
		filtered = filtered[:filteredLen]
 | 
						|
		if len(errs) > 0 {
 | 
						|
			return []*v1.Node{}, FailedPredicateMap{}, errors.NewAggregate(errs)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if len(filtered) > 0 && len(extenders) != 0 {
 | 
						|
		for _, extender := range extenders {
 | 
						|
			filteredList, failedMap, err := extender.Filter(pod, filtered)
 | 
						|
			if err != nil {
 | 
						|
				return []*v1.Node{}, FailedPredicateMap{}, err
 | 
						|
			}
 | 
						|
 | 
						|
			for failedNodeName, failedMsg := range failedMap {
 | 
						|
				if _, found := failedPredicateMap[failedNodeName]; !found {
 | 
						|
					failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
 | 
						|
				}
 | 
						|
				failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
 | 
						|
			}
 | 
						|
			filtered = filteredList
 | 
						|
			if len(filtered) == 0 {
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return filtered, failedPredicateMap, nil
 | 
						|
}
 | 
						|
 | 
						|
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
 | 
						|
func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) {
 | 
						|
	var failedPredicates []algorithm.PredicateFailureReason
 | 
						|
	for _, predicate := range predicateFuncs {
 | 
						|
		fit, reasons, err := predicate(pod, meta, info)
 | 
						|
		if err != nil {
 | 
						|
			err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
 | 
						|
			return false, []algorithm.PredicateFailureReason{}, err
 | 
						|
		}
 | 
						|
		if !fit {
 | 
						|
			failedPredicates = append(failedPredicates, reasons...)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return len(failedPredicates) == 0, failedPredicates, nil
 | 
						|
}
 | 
						|
 | 
						|
// Prioritizes the nodes by running the individual priority functions in parallel.
 | 
						|
// Each priority function is expected to set a score of 0-10
 | 
						|
// 0 is the lowest priority score (least preferred node) and 10 is the highest
 | 
						|
// Each priority function can also have its own weight
 | 
						|
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
 | 
						|
// All scores are finally combined (added) to get the total weighted scores of all nodes
 | 
						|
func PrioritizeNodes(
 | 
						|
	pod *v1.Pod,
 | 
						|
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
 | 
						|
	meta interface{},
 | 
						|
	priorityConfigs []algorithm.PriorityConfig,
 | 
						|
	nodes []*v1.Node,
 | 
						|
	extenders []algorithm.SchedulerExtender,
 | 
						|
) (schedulerapi.HostPriorityList, error) {
 | 
						|
	// If no priority configs are provided, then the EqualPriority function is applied
 | 
						|
	// This is required to generate the priority list in the required format
 | 
						|
	if len(priorityConfigs) == 0 && len(extenders) == 0 {
 | 
						|
		result := make(schedulerapi.HostPriorityList, 0, len(nodes))
 | 
						|
		for i := range nodes {
 | 
						|
			hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			result = append(result, hostPriority)
 | 
						|
		}
 | 
						|
		return result, nil
 | 
						|
	}
 | 
						|
 | 
						|
	var (
 | 
						|
		mu   = sync.Mutex{}
 | 
						|
		wg   = sync.WaitGroup{}
 | 
						|
		errs []error
 | 
						|
	)
 | 
						|
	appendError := func(err error) {
 | 
						|
		mu.Lock()
 | 
						|
		defer mu.Unlock()
 | 
						|
		errs = append(errs, err)
 | 
						|
	}
 | 
						|
 | 
						|
	results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs))
 | 
						|
	for range priorityConfigs {
 | 
						|
		results = append(results, nil)
 | 
						|
	}
 | 
						|
	for i, priorityConfig := range priorityConfigs {
 | 
						|
		if priorityConfig.Function != nil {
 | 
						|
			// DEPRECATED
 | 
						|
			wg.Add(1)
 | 
						|
			go func(index int, config algorithm.PriorityConfig) {
 | 
						|
				defer wg.Done()
 | 
						|
				var err error
 | 
						|
				results[index], err = config.Function(pod, nodeNameToInfo, nodes)
 | 
						|
				if err != nil {
 | 
						|
					appendError(err)
 | 
						|
				}
 | 
						|
			}(i, priorityConfig)
 | 
						|
		} else {
 | 
						|
			results[i] = make(schedulerapi.HostPriorityList, len(nodes))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	processNode := func(index int) {
 | 
						|
		nodeInfo := nodeNameToInfo[nodes[index].Name]
 | 
						|
		var err error
 | 
						|
		for i := range priorityConfigs {
 | 
						|
			if priorityConfigs[i].Function != nil {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
 | 
						|
			if err != nil {
 | 
						|
				appendError(err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	workqueue.Parallelize(16, len(nodes), processNode)
 | 
						|
	for i, priorityConfig := range priorityConfigs {
 | 
						|
		if priorityConfig.Reduce == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		wg.Add(1)
 | 
						|
		go func(index int, config algorithm.PriorityConfig) {
 | 
						|
			defer wg.Done()
 | 
						|
			if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
 | 
						|
				appendError(err)
 | 
						|
			}
 | 
						|
		}(i, priorityConfig)
 | 
						|
	}
 | 
						|
	// Wait for all computations to be finished.
 | 
						|
	wg.Wait()
 | 
						|
	if len(errs) != 0 {
 | 
						|
		return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
 | 
						|
	}
 | 
						|
 | 
						|
	// Summarize all scores.
 | 
						|
	result := make(schedulerapi.HostPriorityList, 0, len(nodes))
 | 
						|
	// TODO: Consider parallelizing it.
 | 
						|
	for i := range nodes {
 | 
						|
		result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
 | 
						|
		for j := range priorityConfigs {
 | 
						|
			result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if len(extenders) != 0 && nodes != nil {
 | 
						|
		combinedScores := make(map[string]int, len(nodeNameToInfo))
 | 
						|
		for _, extender := range extenders {
 | 
						|
			wg.Add(1)
 | 
						|
			go func(ext algorithm.SchedulerExtender) {
 | 
						|
				defer wg.Done()
 | 
						|
				prioritizedList, weight, err := ext.Prioritize(pod, nodes)
 | 
						|
				if err != nil {
 | 
						|
					// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
 | 
						|
					return
 | 
						|
				}
 | 
						|
				mu.Lock()
 | 
						|
				for i := range *prioritizedList {
 | 
						|
					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
 | 
						|
					combinedScores[host] += score * weight
 | 
						|
				}
 | 
						|
				mu.Unlock()
 | 
						|
			}(extender)
 | 
						|
		}
 | 
						|
		// wait for all go routines to finish
 | 
						|
		wg.Wait()
 | 
						|
		for i := range result {
 | 
						|
			result[i].Score += combinedScores[result[i].Host]
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if glog.V(10) {
 | 
						|
		for i := range result {
 | 
						|
			glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return result, nil
 | 
						|
}
 | 
						|
 | 
						|
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
 | 
						|
func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
 | 
						|
	node := nodeInfo.Node()
 | 
						|
	if node == nil {
 | 
						|
		return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
 | 
						|
	}
 | 
						|
	return schedulerapi.HostPriority{
 | 
						|
		Host:  node.Name,
 | 
						|
		Score: 1,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func NewGenericScheduler(
 | 
						|
	cache schedulercache.Cache,
 | 
						|
	predicates map[string]algorithm.FitPredicate,
 | 
						|
	predicateMetaProducer algorithm.MetadataProducer,
 | 
						|
	prioritizers []algorithm.PriorityConfig,
 | 
						|
	priorityMetaProducer algorithm.MetadataProducer,
 | 
						|
	extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
 | 
						|
	return &genericScheduler{
 | 
						|
		cache:                 cache,
 | 
						|
		predicates:            predicates,
 | 
						|
		predicateMetaProducer: predicateMetaProducer,
 | 
						|
		prioritizers:          prioritizers,
 | 
						|
		priorityMetaProducer:  priorityMetaProducer,
 | 
						|
		extenders:             extenders,
 | 
						|
		cachedNodeInfoMap:     make(map[string]*schedulercache.NodeInfo),
 | 
						|
	}
 | 
						|
}
 |