mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-01 18:58:18 +00:00
Merge pull request #41119 from sarat-k/master
Automatic merge from submit-queue (batch tested with PRs 41701, 41818, 41897, 41119, 41562) Optimization of on-wire information sent to scheduler extender interfaces that are stateful The changes are to address the issue described in https://github.com/kubernetes/kubernetes/issues/40991 ```release-note Added support to minimize sending verbose node information to scheduler extender by sending only node names and expecting extenders to cache the rest of the node information ``` cc @ravigadde
This commit is contained in:
@@ -62,9 +62,10 @@ type priorityConfig struct {
|
||||
}
|
||||
|
||||
type Extender struct {
|
||||
name string
|
||||
predicates []fitPredicate
|
||||
prioritizers []priorityConfig
|
||||
name string
|
||||
predicates []fitPredicate
|
||||
prioritizers []priorityConfig
|
||||
nodeCacheCapable bool
|
||||
}
|
||||
|
||||
func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Request) {
|
||||
@@ -82,12 +83,9 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ
|
||||
|
||||
if strings.Contains(req.URL.Path, filter) {
|
||||
resp := &schedulerapi.ExtenderFilterResult{}
|
||||
nodes, failedNodes, err := e.Filter(&args.Pod, &args.Nodes)
|
||||
resp, err := e.Filter(&args)
|
||||
if err != nil {
|
||||
resp.Error = err.Error()
|
||||
} else {
|
||||
resp.Nodes = *nodes
|
||||
resp.FailedNodes = failedNodes
|
||||
}
|
||||
|
||||
if err := encoder.Encode(resp); err != nil {
|
||||
@@ -96,7 +94,7 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ
|
||||
} else if strings.Contains(req.URL.Path, prioritize) {
|
||||
// Prioritize errors are ignored. Default k8s priorities or another extender's
|
||||
// priorities may be applied.
|
||||
priorities, _ := e.Prioritize(&args.Pod, &args.Nodes)
|
||||
priorities, _ := e.Prioritize(&args)
|
||||
|
||||
if err := encoder.Encode(priorities); err != nil {
|
||||
t.Fatalf("Failed to encode %+v", priorities)
|
||||
@@ -106,15 +104,21 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Extender) Filter(pod *v1.Pod, nodes *v1.NodeList) (*v1.NodeList, schedulerapi.FailedNodesMap, error) {
|
||||
filtered := []v1.Node{}
|
||||
func (e *Extender) filterUsingNodeCache(args *schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) {
|
||||
nodeSlice := make([]string, 0)
|
||||
failedNodesMap := schedulerapi.FailedNodesMap{}
|
||||
for _, node := range nodes.Items {
|
||||
for _, nodeName := range *args.NodeNames {
|
||||
fits := true
|
||||
for _, predicate := range e.predicates {
|
||||
fit, err := predicate(pod, &node)
|
||||
fit, err := predicate(&args.Pod,
|
||||
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
|
||||
if err != nil {
|
||||
return &v1.NodeList{}, schedulerapi.FailedNodesMap{}, err
|
||||
return &schedulerapi.ExtenderFilterResult{
|
||||
Nodes: nil,
|
||||
NodeNames: nil,
|
||||
FailedNodes: schedulerapi.FailedNodesMap{},
|
||||
Error: err.Error(),
|
||||
}, err
|
||||
}
|
||||
if !fit {
|
||||
fits = false
|
||||
@@ -122,24 +126,78 @@ func (e *Extender) Filter(pod *v1.Pod, nodes *v1.NodeList) (*v1.NodeList, schedu
|
||||
}
|
||||
}
|
||||
if fits {
|
||||
filtered = append(filtered, node)
|
||||
nodeSlice = append(nodeSlice, nodeName)
|
||||
} else {
|
||||
failedNodesMap[node.Name] = fmt.Sprintf("extender failed: %s", e.name)
|
||||
failedNodesMap[nodeName] = fmt.Sprintf("extender failed: %s", e.name)
|
||||
}
|
||||
}
|
||||
return &v1.NodeList{Items: filtered}, failedNodesMap, nil
|
||||
|
||||
return &schedulerapi.ExtenderFilterResult{
|
||||
Nodes: nil,
|
||||
NodeNames: &nodeSlice,
|
||||
FailedNodes: failedNodesMap,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *Extender) Prioritize(pod *v1.Pod, nodes *v1.NodeList) (*schedulerapi.HostPriorityList, error) {
|
||||
func (e *Extender) Filter(args *schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) {
|
||||
filtered := []v1.Node{}
|
||||
failedNodesMap := schedulerapi.FailedNodesMap{}
|
||||
|
||||
if e.nodeCacheCapable {
|
||||
return e.filterUsingNodeCache(args)
|
||||
} else {
|
||||
for _, node := range args.Nodes.Items {
|
||||
fits := true
|
||||
for _, predicate := range e.predicates {
|
||||
fit, err := predicate(&args.Pod, &node)
|
||||
if err != nil {
|
||||
return &schedulerapi.ExtenderFilterResult{
|
||||
Nodes: &v1.NodeList{},
|
||||
NodeNames: nil,
|
||||
FailedNodes: schedulerapi.FailedNodesMap{},
|
||||
Error: err.Error(),
|
||||
}, err
|
||||
}
|
||||
if !fit {
|
||||
fits = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if fits {
|
||||
filtered = append(filtered, node)
|
||||
} else {
|
||||
failedNodesMap[node.Name] = fmt.Sprintf("extender failed: %s", e.name)
|
||||
}
|
||||
}
|
||||
|
||||
return &schedulerapi.ExtenderFilterResult{
|
||||
Nodes: &v1.NodeList{Items: filtered},
|
||||
NodeNames: nil,
|
||||
FailedNodes: failedNodesMap,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.HostPriorityList, error) {
|
||||
result := schedulerapi.HostPriorityList{}
|
||||
combinedScores := map[string]int{}
|
||||
var nodes = &v1.NodeList{Items: []v1.Node{}}
|
||||
|
||||
if e.nodeCacheCapable {
|
||||
for _, nodeName := range *args.NodeNames {
|
||||
nodes.Items = append(nodes.Items, v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
|
||||
}
|
||||
} else {
|
||||
nodes = args.Nodes
|
||||
}
|
||||
|
||||
for _, prioritizer := range e.prioritizers {
|
||||
weight := prioritizer.weight
|
||||
if weight == 0 {
|
||||
continue
|
||||
}
|
||||
priorityFunc := prioritizer.function
|
||||
prioritizedList, err := priorityFunc(pod, nodes)
|
||||
prioritizedList, err := priorityFunc(&args.Pod, nodes)
|
||||
if err != nil {
|
||||
return &schedulerapi.HostPriorityList{}, err
|
||||
}
|
||||
@@ -220,6 +278,17 @@ func TestSchedulerExtender(t *testing.T) {
|
||||
}))
|
||||
defer es2.Close()
|
||||
|
||||
extender3 := &Extender{
|
||||
name: "extender3",
|
||||
predicates: []fitPredicate{machine_1_2_3_Predicate},
|
||||
prioritizers: []priorityConfig{{machine_2_Prioritizer, 5}},
|
||||
nodeCacheCapable: true,
|
||||
}
|
||||
es3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
extender3.serveHTTP(t, w, req)
|
||||
}))
|
||||
defer es3.Close()
|
||||
|
||||
policy := schedulerapi.Policy{
|
||||
ExtenderConfigs: []schedulerapi.ExtenderConfig{
|
||||
{
|
||||
@@ -236,6 +305,14 @@ func TestSchedulerExtender(t *testing.T) {
|
||||
Weight: 4,
|
||||
EnableHttps: false,
|
||||
},
|
||||
{
|
||||
URLPrefix: es3.URL,
|
||||
FilterVerb: filter,
|
||||
PrioritizeVerb: prioritize,
|
||||
Weight: 10,
|
||||
EnableHttps: false,
|
||||
NodeCacheCapable: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
policy.APIVersion = api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()
|
||||
@@ -313,10 +390,11 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface)
|
||||
t.Fatalf("Failed to schedule pod: %v", err)
|
||||
}
|
||||
|
||||
if myPod, err := cs.Core().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{}); err != nil {
|
||||
myPod, err = cs.Core().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get pod: %v", err)
|
||||
} else if myPod.Spec.NodeName != "machine3" {
|
||||
t.Fatalf("Failed to schedule using extender, expected machine3, got %v", myPod.Spec.NodeName)
|
||||
} else if myPod.Spec.NodeName != "machine2" {
|
||||
t.Fatalf("Failed to schedule using extender, expected machine2, got %v", myPod.Spec.NodeName)
|
||||
}
|
||||
t.Logf("Scheduled pod using extenders")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user