Move ServiceAntiAffinityPriority to score plugin

This commit is contained in:
danielqsj
2019-12-26 15:23:06 +08:00
parent 13133857af
commit f53c81f4a2
6 changed files with 110 additions and 347 deletions

View File

@@ -26,7 +26,6 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
@@ -80,27 +79,20 @@ func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.P
return nil, err
}
informerFactory := handle.SharedInformerFactory()
podLister := handle.SnapshotSharedLister().Pods()
serviceLister := informerFactory.Core().V1().Services().Lister()
priorityMapFunction, priorityReduceFunction := priorities.NewServiceAntiAffinityPriority(podLister, serviceLister, args.AntiAffinityLabelsPreference)
return &ServiceAffinity{
sharedLister: handle.SnapshotSharedLister(),
serviceLister: serviceLister,
priorityMapFunction: priorityMapFunction,
priorityReduceFunction: priorityReduceFunction,
args: args,
sharedLister: handle.SnapshotSharedLister(),
serviceLister: serviceLister,
args: args,
}, nil
}
// ServiceAffinity is a plugin that checks service affinity.
type ServiceAffinity struct {
args Args
sharedLister schedulerlisters.SharedLister
serviceLister corelisters.ServiceLister
priorityMapFunction priorities.PriorityMapFunction
priorityReduceFunction priorities.PriorityReduceFunction
args Args
sharedLister schedulerlisters.SharedLister
serviceLister corelisters.ServiceLister
}
var _ framework.PreFilterPlugin = &ServiceAffinity{}
@@ -293,16 +285,105 @@ func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleStat
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
meta := migration.PriorityMetadata(state)
s, err := pl.priorityMapFunction(pod, meta, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
node := nodeInfo.Node()
if node == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))
}
// Pods matched namespace,selector on current node.
var selector labels.Selector
if services, err := schedulerlisters.GetPodServices(pl.serviceLister, pod); err == nil && len(services) > 0 {
selector = labels.SelectorFromSet(services[0].Spec.Selector)
} else {
selector = labels.NewSelector()
}
if len(nodeInfo.Pods()) == 0 || selector.Empty() {
return 0, nil
}
var score int64
for _, existingPod := range nodeInfo.Pods() {
// Ignore pods being deleted for spreading purposes
// Similar to how it is done for SelectorSpreadPriority
if pod.Namespace == existingPod.Namespace && existingPod.DeletionTimestamp == nil {
if selector.Matches(labels.Set(existingPod.Labels)) {
score++
}
}
}
return score, nil
}
// NormalizeScore invoked after scoring all nodes.
func (pl *ServiceAffinity) NormalizeScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
// Note that priorityReduceFunction doesn't use priority metadata, hence passing nil here.
err := pl.priorityReduceFunction(pod, nil, pl.sharedLister, scores)
return migration.ErrorToFrameworkStatus(err)
reduceResult := make([]float64, len(scores))
for _, label := range pl.args.AntiAffinityLabelsPreference {
if err := pl.updateNodeScoresForLabel(pl.sharedLister, scores, reduceResult, label); err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
}
// Update the result after all labels have been evaluated.
for i, nodeScore := range reduceResult {
scores[i].Score = int64(nodeScore)
}
return nil
}
// updateNodeScoresForLabel updates the node scores for a single label. Note it does not update the
// original result from the map phase directly, but instead updates the reduceResult, which is used
// to update the original result finally. This makes sure that each call to updateNodeScoresForLabel
// receives the same mapResult to work with.
// Why are doing this? This is a workaround for the migration from priorities to score plugins.
// Historically the priority is designed to handle only one label, and multiple priorities are configured
// to work with multiple labels. Using multiple plugins is not allowed in the new framework. Therefore
// we need to modify the old priority to be able to handle multiple labels so that it can be mapped
// to a single plugin.
// TODO: This will be deprecated soon.
func (pl *ServiceAffinity) updateNodeScoresForLabel(sharedLister schedulerlisters.SharedLister, mapResult framework.NodeScoreList, reduceResult []float64, label string) error {
var numServicePods int64
var labelValue string
podCounts := map[string]int64{}
labelNodesStatus := map[string]string{}
maxPriorityFloat64 := float64(framework.MaxNodeScore)
for _, nodePriority := range mapResult {
numServicePods += nodePriority.Score
nodeInfo, err := sharedLister.NodeInfos().Get(nodePriority.Name)
if err != nil {
return err
}
if !labels.Set(nodeInfo.Node().Labels).Has(label) {
continue
}
labelValue = labels.Set(nodeInfo.Node().Labels).Get(label)
labelNodesStatus[nodePriority.Name] = labelValue
podCounts[labelValue] += nodePriority.Score
}
//score int - scale of 0-maxPriority
// 0 being the lowest priority and maxPriority being the highest
for i, nodePriority := range mapResult {
labelValue, ok := labelNodesStatus[nodePriority.Name]
if !ok {
continue
}
// initializing to the default/max node score of maxPriority
fScore := maxPriorityFloat64
if numServicePods > 0 {
fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[labelValue]) / float64(numServicePods))
}
// The score of current label only accounts for 1/len(s.labels) of the total score.
// The policy API definition only allows a single label to be configured, associated with a weight.
// This is compensated by the fact that the total weight is the sum of all weights configured
// in each policy config.
reduceResult[i] += fScore / float64(len(pl.args.AntiAffinityLabelsPreference))
}
return nil
}
// ScoreExtensions of the Score plugin.