refactor scheduler extender related API

- move extender related API from pkg/scheduler/api to pkg/scheduler/apis/extender/v1

- alias extenderv1 to pkg/scheduler/apis/extender/v1

- use NodeScore and NodeScoreList in non-extender logic
This commit is contained in:
Wei Huang
2019-09-27 11:23:29 -07:00
parent 2ebcd2509c
commit cbdb4e3fdb
46 changed files with 703 additions and 1390 deletions

View File

@@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@@ -290,23 +291,23 @@ func (g *genericScheduler) Extenders() []algorithm.SchedulerExtender {
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
if len(priorityList) == 0 {
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
if len(nodeScoreList) == 0 {
return "", fmt.Errorf("empty priorityList")
}
maxScore := priorityList[0].Score
selected := priorityList[0].Host
maxScore := nodeScoreList[0].Score
selected := nodeScoreList[0].Name
cntOfMaxScore := 1
for _, hp := range priorityList[1:] {
if hp.Score > maxScore {
maxScore = hp.Score
selected = hp.Host
for _, ns := range nodeScoreList[1:] {
if ns.Score > maxScore {
maxScore = ns.Score
selected = ns.Name
cntOfMaxScore = 1
} else if hp.Score == maxScore {
} else if ns.Score == maxScore {
cntOfMaxScore++
if rand.Intn(cntOfMaxScore) == 0 {
// Replace the candidate with probability of 1/cntOfMaxScore
selected = hp.Host
selected = ns.Name
}
}
}
@@ -386,8 +387,8 @@ func (g *genericScheduler) Preempt(pluginContext *framework.PluginContext, pod *
// processPreemptionWithExtenders processes preemption with extenders
func (g *genericScheduler) processPreemptionWithExtenders(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictims map[*v1.Node]*extenderv1.Victims,
) (map[*v1.Node]*extenderv1.Victims, error) {
if len(nodeToVictims) > 0 {
for _, extender := range g.extenders {
if extender.SupportsPreemption() && extender.IsInterested(pod) {
@@ -705,12 +706,12 @@ func PrioritizeNodes(
priorityConfigs []priorities.PriorityConfig,
nodes []*v1.Node,
extenders []algorithm.SchedulerExtender,
framework framework.Framework,
pluginContext *framework.PluginContext) (schedulerapi.HostPriorityList, error) {
fwk framework.Framework,
pluginContext *framework.PluginContext) (framework.NodeScoreList, error) {
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
if len(priorityConfigs) == 0 && len(extenders) == 0 {
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
if err != nil {
@@ -732,7 +733,7 @@ func PrioritizeNodes(
errs = append(errs, err)
}
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
results := make([]framework.NodeScoreList, len(priorityConfigs), len(priorityConfigs))
// DEPRECATED: we can remove this when all priorityConfigs implement the
// Map-Reduce pattern.
@@ -748,7 +749,7 @@ func PrioritizeNodes(
}
}(i)
} else {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
results[i] = make(framework.NodeScoreList, len(nodes))
}
}
@@ -763,7 +764,7 @@ func PrioritizeNodes(
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Host = nodes[index].Name
results[i][index].Name = nodes[index].Name
}
}
})
@@ -780,7 +781,7 @@ func PrioritizeNodes(
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, priorityConfigs[index].Name, hostPriority.Score)
}
}
}(i)
@@ -788,26 +789,26 @@ func PrioritizeNodes(
// Wait for all computations to be finished.
wg.Wait()
if len(errs) != 0 {
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
return framework.NodeScoreList{}, errors.NewAggregate(errs)
}
// Run the Score plugins.
scoresMap, scoreStatus := framework.RunScorePlugins(pluginContext, pod, nodes)
scoresMap, scoreStatus := fwk.RunScorePlugins(pluginContext, pod, nodes)
if !scoreStatus.IsSuccess() {
return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
return framework.NodeScoreList{}, scoreStatus.AsError()
}
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
for j := range scoresMap {
result[i].Score += int64(scoresMap[j][i].Score)
result[i].Score += scoresMap[j][i].Score
}
}
@@ -839,26 +840,26 @@ func PrioritizeNodes(
// wait for all go routines to finish
wg.Wait()
for i := range result {
result[i].Score += combinedScores[result[i].Host]
result[i].Score += combinedScores[result[i].Name]
}
}
if klog.V(10) {
for i := range result {
klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
klog.Infof("Host %s => Score %d", result[i].Name, result[i].Score)
}
}
return result, nil
}
// EqualPriorityMap is a prioritizer function that gives an equal weight of one to all nodes
func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error) {
func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
return framework.NodeScore{}, fmt.Errorf("node not found")
}
return schedulerapi.HostPriority{
Host: node.Name,
return framework.NodeScore{
Name: node.Name,
Score: 1,
}, nil
}
@@ -874,7 +875,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulernodeinfo.Node
// 6. If there are still ties, the first such node is picked (sort of randomly).
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
// allocation and garbage collection time.
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
@@ -1012,8 +1013,8 @@ func (g *genericScheduler) selectNodesForPreemption(
metadataProducer predicates.PredicateMetadataProducer,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
) (map[*v1.Node]*extenderv1.Victims, error) {
nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
@@ -1029,7 +1030,7 @@ func (g *genericScheduler) selectNodesForPreemption(
pluginContextClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
victims := extenderv1.Victims{
Pods: pods,
NumPDBViolations: int64(numPDBViolations),
}