Move RunNormalizeScorePlugins and ApplyScoreWeights into RunScorePlugins; Also add unit tests for RunScorePlugins.

This commit is contained in:
Cong Liu
2019-08-19 16:08:14 -04:00
parent 8f0d626228
commit e50a24d64c
6 changed files with 202 additions and 602 deletions

View File

@@ -360,6 +360,8 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
}
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
// Run Score method for each node in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
@@ -374,31 +376,16 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
}
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err)
klog.Error(msg)
return nil, NewStatus(Error, msg)
}
return pluginToNodeScores, nil
}
// RunNormalizeScorePlugins runs the NormalizeScore function of Score plugins.
// It should be called after RunScorePlugins with the PluginToNodeScores result.
// It then modifies the list with normalized scores. It returns a non-success Status
// if any of the NormalizeScore functions returns a non-success status.
func (f *framework) RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScores) *Status {
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
// Run NormalizeScore method for each ScoreWithNormalizePlugin in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(f.scoreWithNormalizePlugins), func(index int) {
pl := f.scoreWithNormalizePlugins[index]
nodeScoreList, ok := scores[pl.Name()]
if !ok {
err := fmt.Errorf("normalize score plugin %q has no corresponding scores in the PluginToNodeScores", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
return
}
nodeScoreList := pluginToNodeScores[pl.Name()]
status := pl.NormalizeScore(pc, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
@@ -406,51 +393,36 @@ func (f *framework) RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, sco
return
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err)
klog.Error(msg)
return NewStatus(Error, msg)
return nil, NewStatus(Error, msg)
}
return nil
}
// ApplyScoreWeights applies weights to the score results. It should be called after
// RunNormalizeScorePlugins.
func (f *framework) ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScores) *Status {
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
// Apply score defaultWeights for each ScorePlugin in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]
nodeScoreList, ok := scores[pl.Name()]
if !ok {
err := fmt.Errorf("score plugin %q has no corresponding scores in the PluginToNodeScores", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
return
}
nodeScoreList := pluginToNodeScores[pl.Name()]
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > MaxNodeScore || nodeScore.Score < MinNodeScore {
err := fmt.Errorf("score plugin %q returns an invalid score %q, it should in the range of [MinNodeScore, MaxNodeScore] after normalizing", pl.Name(), nodeScore.Score)
err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, MinNodeScore, MaxNodeScore)
errCh.SendErrorWithCancel(err, cancel)
return
}
nodeScoreList[i].Score = nodeScore.Score * weight
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while applying score weights for pod %q: %v", pod.Name, err)
msg := fmt.Sprintf("error while applying score defaultWeights for pod %q: %v", pod.Name, err)
klog.Error(msg)
return NewStatus(Error, msg)
return nil, NewStatus(Error, msg)
}
return nil
return pluginToNodeScores, nil
}
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a