mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1209 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1209 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package scheduler
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"sort"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/google/go-cmp/cmp"
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	eventsv1 "k8s.io/api/events/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						|
	"k8s.io/client-go/informers"
 | 
						|
	"k8s.io/client-go/kubernetes"
 | 
						|
	"k8s.io/client-go/kubernetes/fake"
 | 
						|
	"k8s.io/client-go/kubernetes/scheme"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
	"k8s.io/client-go/tools/events"
 | 
						|
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
						|
	"k8s.io/klog/v2"
 | 
						|
	"k8s.io/klog/v2/ktesting"
 | 
						|
	"k8s.io/kubernetes/pkg/features"
 | 
						|
	schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
 | 
						|
	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
 | 
						|
	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
 | 
						|
	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/profile"
 | 
						|
	st "k8s.io/kubernetes/pkg/scheduler/testing"
 | 
						|
	tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
 | 
						|
	testingclock "k8s.io/utils/clock/testing"
 | 
						|
	"k8s.io/utils/ptr"
 | 
						|
)
 | 
						|
 | 
						|
func TestSchedulerCreation(t *testing.T) {
 | 
						|
	invalidRegistry := map[string]frameworkruntime.PluginFactory{
 | 
						|
		defaultbinder.Name: defaultbinder.New,
 | 
						|
	}
 | 
						|
	validRegistry := map[string]frameworkruntime.PluginFactory{
 | 
						|
		"Foo": defaultbinder.New,
 | 
						|
	}
 | 
						|
	cases := []struct {
 | 
						|
		name          string
 | 
						|
		opts          []Option
 | 
						|
		wantErr       string
 | 
						|
		wantProfiles  []string
 | 
						|
		wantExtenders []string
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name: "valid out-of-tree registry",
 | 
						|
			opts: []Option{
 | 
						|
				WithFrameworkOutOfTreeRegistry(validRegistry),
 | 
						|
				WithProfiles(
 | 
						|
					schedulerapi.KubeSchedulerProfile{
 | 
						|
						SchedulerName: "default-scheduler",
 | 
						|
						Plugins: &schedulerapi.Plugins{
 | 
						|
							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				)},
 | 
						|
			wantProfiles: []string{"default-scheduler"},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "repeated plugin name in out-of-tree plugin",
 | 
						|
			opts: []Option{
 | 
						|
				WithFrameworkOutOfTreeRegistry(invalidRegistry),
 | 
						|
				WithProfiles(
 | 
						|
					schedulerapi.KubeSchedulerProfile{
 | 
						|
						SchedulerName: "default-scheduler",
 | 
						|
						Plugins: &schedulerapi.Plugins{
 | 
						|
							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				)},
 | 
						|
			wantProfiles: []string{"default-scheduler"},
 | 
						|
			wantErr:      "a plugin named DefaultBinder already exists",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "multiple profiles",
 | 
						|
			opts: []Option{
 | 
						|
				WithProfiles(
 | 
						|
					schedulerapi.KubeSchedulerProfile{
 | 
						|
						SchedulerName: "foo",
 | 
						|
						Plugins: &schedulerapi.Plugins{
 | 
						|
							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
						},
 | 
						|
					},
 | 
						|
					schedulerapi.KubeSchedulerProfile{
 | 
						|
						SchedulerName: "bar",
 | 
						|
						Plugins: &schedulerapi.Plugins{
 | 
						|
							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				)},
 | 
						|
			wantProfiles: []string{"bar", "foo"},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "Repeated profiles",
 | 
						|
			opts: []Option{
 | 
						|
				WithProfiles(
 | 
						|
					schedulerapi.KubeSchedulerProfile{
 | 
						|
						SchedulerName: "foo",
 | 
						|
						Plugins: &schedulerapi.Plugins{
 | 
						|
							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
						},
 | 
						|
					},
 | 
						|
					schedulerapi.KubeSchedulerProfile{
 | 
						|
						SchedulerName: "bar",
 | 
						|
						Plugins: &schedulerapi.Plugins{
 | 
						|
							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
						},
 | 
						|
					},
 | 
						|
					schedulerapi.KubeSchedulerProfile{
 | 
						|
						SchedulerName: "foo",
 | 
						|
						Plugins: &schedulerapi.Plugins{
 | 
						|
							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				)},
 | 
						|
			wantErr: "duplicate profile with scheduler name \"foo\"",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "With extenders",
 | 
						|
			opts: []Option{
 | 
						|
				WithProfiles(
 | 
						|
					schedulerapi.KubeSchedulerProfile{
 | 
						|
						SchedulerName: "default-scheduler",
 | 
						|
						Plugins: &schedulerapi.Plugins{
 | 
						|
							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				),
 | 
						|
				WithExtenders(
 | 
						|
					schedulerapi.Extender{
 | 
						|
						URLPrefix: "http://extender.kube-system/",
 | 
						|
					},
 | 
						|
				),
 | 
						|
			},
 | 
						|
			wantProfiles:  []string{"default-scheduler"},
 | 
						|
			wantExtenders: []string{"http://extender.kube-system/"},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range cases {
 | 
						|
		t.Run(tc.name, func(t *testing.T) {
 | 
						|
			client := fake.NewSimpleClientset()
 | 
						|
			informerFactory := informers.NewSharedInformerFactory(client, 0)
 | 
						|
 | 
						|
			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
 | 
						|
 | 
						|
			_, ctx := ktesting.NewTestContext(t)
 | 
						|
			ctx, cancel := context.WithCancel(ctx)
 | 
						|
			defer cancel()
 | 
						|
			s, err := New(
 | 
						|
				ctx,
 | 
						|
				client,
 | 
						|
				informerFactory,
 | 
						|
				nil,
 | 
						|
				profile.NewRecorderFactory(eventBroadcaster),
 | 
						|
				tc.opts...,
 | 
						|
			)
 | 
						|
 | 
						|
			// Errors
 | 
						|
			if len(tc.wantErr) != 0 {
 | 
						|
				if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
 | 
						|
					t.Errorf("got error %q, want %q", err, tc.wantErr)
 | 
						|
				}
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if err != nil {
 | 
						|
				t.Fatalf("Failed to create scheduler: %v", err)
 | 
						|
			}
 | 
						|
 | 
						|
			// Profiles
 | 
						|
			profiles := make([]string, 0, len(s.Profiles))
 | 
						|
			for name := range s.Profiles {
 | 
						|
				profiles = append(profiles, name)
 | 
						|
			}
 | 
						|
			sort.Strings(profiles)
 | 
						|
			if diff := cmp.Diff(tc.wantProfiles, profiles); diff != "" {
 | 
						|
				t.Errorf("unexpected profiles (-want, +got):\n%s", diff)
 | 
						|
			}
 | 
						|
 | 
						|
			// Extenders
 | 
						|
			if len(tc.wantExtenders) != 0 {
 | 
						|
				// Scheduler.Extenders
 | 
						|
				extenders := make([]string, 0, len(s.Extenders))
 | 
						|
				for _, e := range s.Extenders {
 | 
						|
					extenders = append(extenders, e.Name())
 | 
						|
				}
 | 
						|
				if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
 | 
						|
					t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
 | 
						|
				}
 | 
						|
 | 
						|
				// framework.Handle.Extenders()
 | 
						|
				for _, p := range s.Profiles {
 | 
						|
					extenders := make([]string, 0, len(p.Extenders()))
 | 
						|
					for _, e := range p.Extenders() {
 | 
						|
						extenders = append(extenders, e.Name())
 | 
						|
					}
 | 
						|
					if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
 | 
						|
						t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestFailureHandler(t *testing.T) {
 | 
						|
	testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj()
 | 
						|
	testPodUpdated := testPod.DeepCopy()
 | 
						|
	testPodUpdated.Labels = map[string]string{"foo": ""}
 | 
						|
 | 
						|
	tests := []struct {
 | 
						|
		name                       string
 | 
						|
		podUpdatedDuringScheduling bool // pod is updated during a scheduling cycle
 | 
						|
		podDeletedDuringScheduling bool // pod is deleted during a scheduling cycle
 | 
						|
		expect                     *v1.Pod
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:                       "pod is updated during a scheduling cycle",
 | 
						|
			podUpdatedDuringScheduling: true,
 | 
						|
			expect:                     testPodUpdated,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:   "pod is not updated during a scheduling cycle",
 | 
						|
			expect: testPod,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                       "pod is deleted during a scheduling cycle",
 | 
						|
			podDeletedDuringScheduling: true,
 | 
						|
			expect:                     nil,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tt := range tests {
 | 
						|
		t.Run(tt.name, func(t *testing.T) {
 | 
						|
			logger, ctx := ktesting.NewTestContext(t)
 | 
						|
			ctx, cancel := context.WithCancel(ctx)
 | 
						|
			defer cancel()
 | 
						|
 | 
						|
			client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
 | 
						|
			informerFactory := informers.NewSharedInformerFactory(client, 0)
 | 
						|
			podInformer := informerFactory.Core().V1().Pods()
 | 
						|
			// Need to add/update/delete testPod to the store.
 | 
						|
			podInformer.Informer().GetStore().Add(testPod)
 | 
						|
 | 
						|
			queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
 | 
						|
			schedulerCache := internalcache.New(ctx, 30*time.Second)
 | 
						|
 | 
						|
			if err := queue.Add(logger, testPod); err != nil {
 | 
						|
				t.Fatalf("Add failed: %v", err)
 | 
						|
			}
 | 
						|
 | 
						|
			if _, err := queue.Pop(logger); err != nil {
 | 
						|
				t.Fatalf("Pop failed: %v", err)
 | 
						|
			}
 | 
						|
 | 
						|
			if tt.podUpdatedDuringScheduling {
 | 
						|
				podInformer.Informer().GetStore().Update(testPodUpdated)
 | 
						|
				queue.Update(logger, testPod, testPodUpdated)
 | 
						|
			}
 | 
						|
			if tt.podDeletedDuringScheduling {
 | 
						|
				podInformer.Informer().GetStore().Delete(testPod)
 | 
						|
				queue.Delete(testPod)
 | 
						|
			}
 | 
						|
 | 
						|
			s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
 | 
						|
			if err != nil {
 | 
						|
				t.Fatal(err)
 | 
						|
			}
 | 
						|
 | 
						|
			testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
 | 
						|
			s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable), nil, time.Now())
 | 
						|
 | 
						|
			var got *v1.Pod
 | 
						|
			if tt.podUpdatedDuringScheduling {
 | 
						|
				head, e := queue.Pop(logger)
 | 
						|
				if e != nil {
 | 
						|
					t.Fatalf("Cannot pop pod from the activeQ: %v", e)
 | 
						|
				}
 | 
						|
				got = head.Pod
 | 
						|
			} else {
 | 
						|
				got = getPodFromPriorityQueue(queue, testPod)
 | 
						|
			}
 | 
						|
 | 
						|
			if diff := cmp.Diff(tt.expect, got); diff != "" {
 | 
						|
				t.Errorf("Unexpected pod (-want, +got): %s", diff)
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestFailureHandler_PodAlreadyBound(t *testing.T) {
 | 
						|
	logger, ctx := ktesting.NewTestContext(t)
 | 
						|
	ctx, cancel := context.WithCancel(ctx)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	nodeFoo := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
 | 
						|
	testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Node("foo").Obj()
 | 
						|
 | 
						|
	client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{nodeFoo}})
 | 
						|
	informerFactory := informers.NewSharedInformerFactory(client, 0)
 | 
						|
	podInformer := informerFactory.Core().V1().Pods()
 | 
						|
	// Need to add testPod to the store.
 | 
						|
	podInformer.Informer().GetStore().Add(testPod)
 | 
						|
 | 
						|
	queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
 | 
						|
	schedulerCache := internalcache.New(ctx, 30*time.Second)
 | 
						|
 | 
						|
	// Add node to schedulerCache no matter it's deleted in API server or not.
 | 
						|
	schedulerCache.AddNode(logger, &nodeFoo)
 | 
						|
 | 
						|
	s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
 | 
						|
	if err != nil {
 | 
						|
		t.Fatal(err)
 | 
						|
	}
 | 
						|
 | 
						|
	testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
 | 
						|
	s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(fmt.Errorf("binding rejected: timeout")), nil, time.Now())
 | 
						|
 | 
						|
	pod := getPodFromPriorityQueue(queue, testPod)
 | 
						|
	if pod != nil {
 | 
						|
		t.Fatalf("Unexpected pod: %v should not be in PriorityQueue when the NodeName of pod is not empty", pod.Name)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TestWithPercentageOfNodesToScore tests scheduler's PercentageOfNodesToScore is set correctly.
 | 
						|
func TestWithPercentageOfNodesToScore(t *testing.T) {
 | 
						|
	tests := []struct {
 | 
						|
		name                           string
 | 
						|
		percentageOfNodesToScoreConfig *int32
 | 
						|
		wantedPercentageOfNodesToScore int32
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:                           "percentageOfNodesScore is nil",
 | 
						|
			percentageOfNodesToScoreConfig: nil,
 | 
						|
			wantedPercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                           "percentageOfNodesScore is not nil",
 | 
						|
			percentageOfNodesToScoreConfig: ptr.To[int32](10),
 | 
						|
			wantedPercentageOfNodesToScore: 10,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tt := range tests {
 | 
						|
		t.Run(tt.name, func(t *testing.T) {
 | 
						|
			client := fake.NewSimpleClientset()
 | 
						|
			informerFactory := informers.NewSharedInformerFactory(client, 0)
 | 
						|
			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
 | 
						|
			_, ctx := ktesting.NewTestContext(t)
 | 
						|
			ctx, cancel := context.WithCancel(ctx)
 | 
						|
			defer cancel()
 | 
						|
			sched, err := New(
 | 
						|
				ctx,
 | 
						|
				client,
 | 
						|
				informerFactory,
 | 
						|
				nil,
 | 
						|
				profile.NewRecorderFactory(eventBroadcaster),
 | 
						|
				WithPercentageOfNodesToScore(tt.percentageOfNodesToScoreConfig),
 | 
						|
			)
 | 
						|
			if err != nil {
 | 
						|
				t.Fatalf("Failed to create scheduler: %v", err)
 | 
						|
			}
 | 
						|
			if sched.percentageOfNodesToScore != tt.wantedPercentageOfNodesToScore {
 | 
						|
				t.Errorf("scheduler.percercentageOfNodesToScore = %v, want %v", sched.percentageOfNodesToScore, tt.wantedPercentageOfNodesToScore)
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// getPodFromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
 | 
						|
// the specific pod from the given priority queue. It returns the found pod in the priority queue.
 | 
						|
func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
 | 
						|
	podList, _ := queue.PendingPods()
 | 
						|
	if len(podList) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
 | 
						|
	if err != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	for _, foundPod := range podList {
 | 
						|
		foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
 | 
						|
		if err != nil {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		if foundPodKey == queryPodKey {
 | 
						|
			return foundPod
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func initScheduler(ctx context.Context, cache internalcache.Cache, queue internalqueue.SchedulingQueue,
 | 
						|
	client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) {
 | 
						|
	logger := klog.FromContext(ctx)
 | 
						|
	registerPluginFuncs := []tf.RegisterPluginFunc{
 | 
						|
		tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
 | 
						|
		tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
 | 
						|
	}
 | 
						|
	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
 | 
						|
	waitingPods := frameworkruntime.NewWaitingPodsMap()
 | 
						|
	fwk, err := tf.NewFramework(ctx,
 | 
						|
		registerPluginFuncs,
 | 
						|
		testSchedulerName,
 | 
						|
		frameworkruntime.WithClientSet(client),
 | 
						|
		frameworkruntime.WithInformerFactory(informerFactory),
 | 
						|
		frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
 | 
						|
		frameworkruntime.WithWaitingPods(waitingPods),
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	s := &Scheduler{
 | 
						|
		Cache:           cache,
 | 
						|
		client:          client,
 | 
						|
		StopEverything:  ctx.Done(),
 | 
						|
		SchedulingQueue: queue,
 | 
						|
		Profiles:        profile.Map{testSchedulerName: fwk},
 | 
						|
		logger:          logger,
 | 
						|
	}
 | 
						|
	s.applyDefaultHandlers()
 | 
						|
 | 
						|
	return s, fwk, nil
 | 
						|
}
 | 
						|
 | 
						|
func TestInitPluginsWithIndexers(t *testing.T) {
 | 
						|
	tests := []struct {
 | 
						|
		name string
 | 
						|
		// the plugin registration ordering must not matter, being map traversal random
 | 
						|
		entrypoints map[string]frameworkruntime.PluginFactory
 | 
						|
		wantErr     string
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name: "register indexer, no conflicts",
 | 
						|
			entrypoints: map[string]frameworkruntime.PluginFactory{
 | 
						|
				"AddIndexer": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
 | 
						|
					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
 | 
						|
					err := podInformer.Informer().AddIndexers(cache.Indexers{
 | 
						|
						"nodeName": indexByPodSpecNodeName,
 | 
						|
					})
 | 
						|
					return &TestPlugin{name: "AddIndexer"}, err
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "register the same indexer name multiple times, conflict",
 | 
						|
			// order of registration doesn't matter
 | 
						|
			entrypoints: map[string]frameworkruntime.PluginFactory{
 | 
						|
				"AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
 | 
						|
					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
 | 
						|
					err := podInformer.Informer().AddIndexers(cache.Indexers{
 | 
						|
						"nodeName": indexByPodSpecNodeName,
 | 
						|
					})
 | 
						|
					return &TestPlugin{name: "AddIndexer1"}, err
 | 
						|
				},
 | 
						|
				"AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
 | 
						|
					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
 | 
						|
					err := podInformer.Informer().AddIndexers(cache.Indexers{
 | 
						|
						"nodeName": indexByPodAnnotationNodeName,
 | 
						|
					})
 | 
						|
					return &TestPlugin{name: "AddIndexer1"}, err
 | 
						|
				},
 | 
						|
			},
 | 
						|
			wantErr: "indexer conflict",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "register the same indexer body with different names, no conflicts",
 | 
						|
			// order of registration doesn't matter
 | 
						|
			entrypoints: map[string]frameworkruntime.PluginFactory{
 | 
						|
				"AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
 | 
						|
					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
 | 
						|
					err := podInformer.Informer().AddIndexers(cache.Indexers{
 | 
						|
						"nodeName1": indexByPodSpecNodeName,
 | 
						|
					})
 | 
						|
					return &TestPlugin{name: "AddIndexer1"}, err
 | 
						|
				},
 | 
						|
				"AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
 | 
						|
					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
 | 
						|
					err := podInformer.Informer().AddIndexers(cache.Indexers{
 | 
						|
						"nodeName2": indexByPodAnnotationNodeName,
 | 
						|
					})
 | 
						|
					return &TestPlugin{name: "AddIndexer2"}, err
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tt := range tests {
 | 
						|
		t.Run(tt.name, func(t *testing.T) {
 | 
						|
			fakeInformerFactory := NewInformerFactory(&fake.Clientset{}, 0*time.Second)
 | 
						|
 | 
						|
			var registerPluginFuncs []tf.RegisterPluginFunc
 | 
						|
			for name, entrypoint := range tt.entrypoints {
 | 
						|
				registerPluginFuncs = append(registerPluginFuncs,
 | 
						|
					// anything supported by TestPlugin is fine
 | 
						|
					tf.RegisterFilterPlugin(name, entrypoint),
 | 
						|
				)
 | 
						|
			}
 | 
						|
			// we always need this
 | 
						|
			registerPluginFuncs = append(registerPluginFuncs,
 | 
						|
				tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
 | 
						|
				tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
 | 
						|
			)
 | 
						|
			_, ctx := ktesting.NewTestContext(t)
 | 
						|
			ctx, cancel := context.WithCancel(ctx)
 | 
						|
			defer cancel()
 | 
						|
			_, err := tf.NewFramework(ctx, registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory))
 | 
						|
 | 
						|
			if len(tt.wantErr) > 0 {
 | 
						|
				if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
 | 
						|
					t.Errorf("got error %q, want %q", err, tt.wantErr)
 | 
						|
				}
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if err != nil {
 | 
						|
				t.Fatalf("Failed to create scheduler: %v", err)
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func indexByPodSpecNodeName(obj interface{}) ([]string, error) {
 | 
						|
	pod, ok := obj.(*v1.Pod)
 | 
						|
	if !ok {
 | 
						|
		return []string{}, nil
 | 
						|
	}
 | 
						|
	if len(pod.Spec.NodeName) == 0 {
 | 
						|
		return []string{}, nil
 | 
						|
	}
 | 
						|
	return []string{pod.Spec.NodeName}, nil
 | 
						|
}
 | 
						|
 | 
						|
func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) {
 | 
						|
	pod, ok := obj.(*v1.Pod)
 | 
						|
	if !ok {
 | 
						|
		return []string{}, nil
 | 
						|
	}
 | 
						|
	if len(pod.Annotations) == 0 {
 | 
						|
		return []string{}, nil
 | 
						|
	}
 | 
						|
	nodeName, ok := pod.Annotations["node-name"]
 | 
						|
	if !ok {
 | 
						|
		return []string{}, nil
 | 
						|
	}
 | 
						|
	return []string{nodeName}, nil
 | 
						|
}
 | 
						|
 | 
						|
const (
 | 
						|
	filterWithoutEnqueueExtensions = "filterWithoutEnqueueExtensions"
 | 
						|
	fakeNode                       = "fakeNode"
 | 
						|
	fakePod                        = "fakePod"
 | 
						|
	emptyEventsToRegister          = "emptyEventsToRegister"
 | 
						|
	queueSort                      = "no-op-queue-sort-plugin"
 | 
						|
	fakeBind                       = "bind-plugin"
 | 
						|
	emptyEventExtensions           = "emptyEventExtensions"
 | 
						|
	fakePermit                     = "fakePermit"
 | 
						|
)
 | 
						|
 | 
						|
func Test_buildQueueingHintMap(t *testing.T) {
 | 
						|
	tests := []struct {
 | 
						|
		name                string
 | 
						|
		plugins             []framework.Plugin
 | 
						|
		want                map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
 | 
						|
		featuregateDisabled bool
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:    "filter without EnqueueExtensions plugin",
 | 
						|
			plugins: []framework.Plugin{&filterWithoutEnqueueExtensionsPlugin{}},
 | 
						|
			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
 | 
						|
				{Resource: framework.Pod, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.Node, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.CSINode, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.CSIDriver, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.CSIStorageCapacity, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.PersistentVolume, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.StorageClass, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.PodSchedulingContext, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.ResourceClaim, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.ResourceClass, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.ResourceClaimParameters, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.ResourceClassParameters, ActionType: framework.All}: {
 | 
						|
					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:    "node and pod plugin",
 | 
						|
			plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
 | 
						|
			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
 | 
						|
				{Resource: framework.Pod, ActionType: framework.Add}: {
 | 
						|
					{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.Node, ActionType: framework.Add}: {
 | 
						|
					{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
 | 
						|
					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                "node and pod plugin (featuregate is disabled)",
 | 
						|
			plugins:             []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
 | 
						|
			featuregateDisabled: true,
 | 
						|
			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
 | 
						|
				{Resource: framework.Pod, ActionType: framework.Add}: {
 | 
						|
					{PluginName: fakePod, QueueingHintFn: defaultQueueingHintFn}, // default queueing hint due to disabled feature gate.
 | 
						|
				},
 | 
						|
				{Resource: framework.Node, ActionType: framework.Add}: {
 | 
						|
					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // default queueing hint due to disabled feature gate.
 | 
						|
				},
 | 
						|
				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
 | 
						|
					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:    "register plugin with empty event",
 | 
						|
			plugins: []framework.Plugin{&emptyEventPlugin{}},
 | 
						|
			want:    map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:    "register plugins including emptyEventPlugin",
 | 
						|
			plugins: []framework.Plugin{&emptyEventPlugin{}, &fakeNodePlugin{}},
 | 
						|
			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
 | 
						|
				{Resource: framework.Pod, ActionType: framework.Add}: {
 | 
						|
					{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.Node, ActionType: framework.Add}: {
 | 
						|
					{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
 | 
						|
				},
 | 
						|
				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
 | 
						|
					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tt := range tests {
 | 
						|
		t.Run(tt.name, func(t *testing.T) {
 | 
						|
			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, !tt.featuregateDisabled)()
 | 
						|
			logger, ctx := ktesting.NewTestContext(t)
 | 
						|
			ctx, cancel := context.WithCancel(ctx)
 | 
						|
			defer cancel()
 | 
						|
			registry := frameworkruntime.Registry{}
 | 
						|
			cfgPls := &schedulerapi.Plugins{}
 | 
						|
			plugins := append(tt.plugins, &fakebindPlugin{}, &fakeQueueSortPlugin{})
 | 
						|
			for _, pl := range plugins {
 | 
						|
				tmpPl := pl
 | 
						|
				if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
 | 
						|
					return tmpPl, nil
 | 
						|
				}); err != nil {
 | 
						|
					t.Fatalf("fail to register filter plugin (%s)", pl.Name())
 | 
						|
				}
 | 
						|
				cfgPls.MultiPoint.Enabled = append(cfgPls.MultiPoint.Enabled, schedulerapi.Plugin{Name: pl.Name()})
 | 
						|
			}
 | 
						|
 | 
						|
			profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls}
 | 
						|
			fwk, err := newFramework(ctx, registry, profile)
 | 
						|
			if err != nil {
 | 
						|
				t.Fatal(err)
 | 
						|
			}
 | 
						|
 | 
						|
			exts := fwk.EnqueueExtensions()
 | 
						|
			// need to sort to make the test result stable.
 | 
						|
			sort.Slice(exts, func(i, j int) bool {
 | 
						|
				return exts[i].Name() < exts[j].Name()
 | 
						|
			})
 | 
						|
 | 
						|
			got := buildQueueingHintMap(exts)
 | 
						|
 | 
						|
			for e, fns := range got {
 | 
						|
				wantfns, ok := tt.want[e]
 | 
						|
				if !ok {
 | 
						|
					t.Errorf("got unexpected event %v", e)
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				if len(fns) != len(wantfns) {
 | 
						|
					t.Errorf("got %v queueing hint functions, want %v", len(fns), len(wantfns))
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				for i, fn := range fns {
 | 
						|
					if fn.PluginName != wantfns[i].PluginName {
 | 
						|
						t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName)
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					got, gotErr := fn.QueueingHintFn(logger, nil, nil, nil)
 | 
						|
					want, wantErr := wantfns[i].QueueingHintFn(logger, nil, nil, nil)
 | 
						|
					if got != want || gotErr != wantErr {
 | 
						|
						t.Errorf("got queueing hint function (%v) returning (%v, %v), expect it to return (%v, %v)", fn.PluginName, got, gotErr, want, wantErr)
 | 
						|
						continue
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap.
 | 
						|
func Test_UnionedGVKs(t *testing.T) {
 | 
						|
	tests := []struct {
 | 
						|
		name    string
 | 
						|
		plugins schedulerapi.PluginSet
 | 
						|
		want    map[framework.GVK]framework.ActionType
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name: "filter without EnqueueExtensions plugin",
 | 
						|
			plugins: schedulerapi.PluginSet{
 | 
						|
				Enabled: []schedulerapi.Plugin{
 | 
						|
					{Name: filterWithoutEnqueueExtensions},
 | 
						|
					{Name: queueSort},
 | 
						|
					{Name: fakeBind},
 | 
						|
				},
 | 
						|
				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
 | 
						|
			},
 | 
						|
			want: map[framework.GVK]framework.ActionType{
 | 
						|
				framework.Pod:                     framework.All,
 | 
						|
				framework.Node:                    framework.All,
 | 
						|
				framework.CSINode:                 framework.All,
 | 
						|
				framework.CSIDriver:               framework.All,
 | 
						|
				framework.CSIStorageCapacity:      framework.All,
 | 
						|
				framework.PersistentVolume:        framework.All,
 | 
						|
				framework.PersistentVolumeClaim:   framework.All,
 | 
						|
				framework.StorageClass:            framework.All,
 | 
						|
				framework.PodSchedulingContext:    framework.All,
 | 
						|
				framework.ResourceClaim:           framework.All,
 | 
						|
				framework.ResourceClass:           framework.All,
 | 
						|
				framework.ResourceClaimParameters: framework.All,
 | 
						|
				framework.ResourceClassParameters: framework.All,
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "node plugin",
 | 
						|
			plugins: schedulerapi.PluginSet{
 | 
						|
				Enabled: []schedulerapi.Plugin{
 | 
						|
					{Name: fakeNode},
 | 
						|
					{Name: queueSort},
 | 
						|
					{Name: fakeBind},
 | 
						|
				},
 | 
						|
				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
 | 
						|
			},
 | 
						|
			want: map[framework.GVK]framework.ActionType{
 | 
						|
				framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "pod plugin",
 | 
						|
			plugins: schedulerapi.PluginSet{
 | 
						|
				Enabled: []schedulerapi.Plugin{
 | 
						|
					{Name: fakePod},
 | 
						|
					{Name: queueSort},
 | 
						|
					{Name: fakeBind},
 | 
						|
				},
 | 
						|
				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
 | 
						|
			},
 | 
						|
			want: map[framework.GVK]framework.ActionType{
 | 
						|
				framework.Pod: framework.Add,
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "node and pod plugin",
 | 
						|
			plugins: schedulerapi.PluginSet{
 | 
						|
				Enabled: []schedulerapi.Plugin{
 | 
						|
					{Name: fakePod},
 | 
						|
					{Name: fakeNode},
 | 
						|
					{Name: queueSort},
 | 
						|
					{Name: fakeBind},
 | 
						|
				},
 | 
						|
				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
 | 
						|
			},
 | 
						|
			want: map[framework.GVK]framework.ActionType{
 | 
						|
				framework.Pod:  framework.Add,
 | 
						|
				framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
 | 
						|
			},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "empty EventsToRegister plugin",
 | 
						|
			plugins: schedulerapi.PluginSet{
 | 
						|
				Enabled: []schedulerapi.Plugin{
 | 
						|
					{Name: emptyEventsToRegister},
 | 
						|
					{Name: queueSort},
 | 
						|
					{Name: fakeBind},
 | 
						|
				},
 | 
						|
				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
 | 
						|
			},
 | 
						|
			want: map[framework.GVK]framework.ActionType{},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:    "plugins with default profile",
 | 
						|
			plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
 | 
						|
			want: map[framework.GVK]framework.ActionType{
 | 
						|
				framework.Pod:                   framework.All,
 | 
						|
				framework.Node:                  framework.All,
 | 
						|
				framework.CSINode:               framework.All - framework.Delete,
 | 
						|
				framework.CSIDriver:             framework.All - framework.Delete,
 | 
						|
				framework.CSIStorageCapacity:    framework.All - framework.Delete,
 | 
						|
				framework.PersistentVolume:      framework.All - framework.Delete,
 | 
						|
				framework.PersistentVolumeClaim: framework.All - framework.Delete,
 | 
						|
				framework.StorageClass:          framework.All - framework.Delete,
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	for _, tt := range tests {
 | 
						|
		t.Run(tt.name, func(t *testing.T) {
 | 
						|
			_, ctx := ktesting.NewTestContext(t)
 | 
						|
			ctx, cancel := context.WithCancel(ctx)
 | 
						|
			defer cancel()
 | 
						|
			registry := plugins.NewInTreeRegistry()
 | 
						|
 | 
						|
			cfgPls := &schedulerapi.Plugins{MultiPoint: tt.plugins}
 | 
						|
			plugins := []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}, &filterWithoutEnqueueExtensionsPlugin{}, &emptyEventsToRegisterPlugin{}, &fakeQueueSortPlugin{}, &fakebindPlugin{}}
 | 
						|
			for _, pl := range plugins {
 | 
						|
				tmpPl := pl
 | 
						|
				if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
 | 
						|
					return tmpPl, nil
 | 
						|
				}); err != nil {
 | 
						|
					t.Fatalf("fail to register filter plugin (%s)", pl.Name())
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls, PluginConfig: defaults.PluginConfigsV1}
 | 
						|
			fwk, err := newFramework(ctx, registry, profile)
 | 
						|
			if err != nil {
 | 
						|
				t.Fatal(err)
 | 
						|
			}
 | 
						|
 | 
						|
			queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{
 | 
						|
				"default": buildQueueingHintMap(fwk.EnqueueExtensions()),
 | 
						|
			}
 | 
						|
			got := unionedGVKs(queueingHintsPerProfile)
 | 
						|
 | 
						|
			if diff := cmp.Diff(tt.want, got); diff != "" {
 | 
						|
				t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff)
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func newFramework(ctx context.Context, r frameworkruntime.Registry, profile schedulerapi.KubeSchedulerProfile) (framework.Framework, error) {
 | 
						|
	return frameworkruntime.NewFramework(ctx, r, &profile,
 | 
						|
		frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(nil, nil)),
 | 
						|
		frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)),
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
func TestFrameworkHandler_IterateOverWaitingPods(t *testing.T) {
 | 
						|
	const (
 | 
						|
		testSchedulerProfile1 = "test-scheduler-profile-1"
 | 
						|
		testSchedulerProfile2 = "test-scheduler-profile-2"
 | 
						|
		testSchedulerProfile3 = "test-scheduler-profile-3"
 | 
						|
	)
 | 
						|
 | 
						|
	nodes := []runtime.Object{
 | 
						|
		st.MakeNode().Name("node1").UID("node1").Obj(),
 | 
						|
		st.MakeNode().Name("node2").UID("node2").Obj(),
 | 
						|
		st.MakeNode().Name("node3").UID("node3").Obj(),
 | 
						|
	}
 | 
						|
 | 
						|
	cases := []struct {
 | 
						|
		name                        string
 | 
						|
		profiles                    []schedulerapi.KubeSchedulerProfile
 | 
						|
		waitSchedulingPods          []*v1.Pod
 | 
						|
		expectPodNamesInWaitingPods []string
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name: "pods with same profile are waiting on permit stage",
 | 
						|
			profiles: []schedulerapi.KubeSchedulerProfile{
 | 
						|
				{
 | 
						|
					SchedulerName: testSchedulerProfile1,
 | 
						|
					Plugins: &schedulerapi.Plugins{
 | 
						|
						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
 | 
						|
						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			waitSchedulingPods: []*v1.Pod{
 | 
						|
				st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(),
 | 
						|
				st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(),
 | 
						|
				st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile1).Obj(),
 | 
						|
			},
 | 
						|
			expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3"},
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name: "pods with different profiles are waiting on permit stage",
 | 
						|
			profiles: []schedulerapi.KubeSchedulerProfile{
 | 
						|
				{
 | 
						|
					SchedulerName: testSchedulerProfile1,
 | 
						|
					Plugins: &schedulerapi.Plugins{
 | 
						|
						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
 | 
						|
						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
					},
 | 
						|
				},
 | 
						|
				{
 | 
						|
					SchedulerName: testSchedulerProfile2,
 | 
						|
					Plugins: &schedulerapi.Plugins{
 | 
						|
						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
 | 
						|
						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
					},
 | 
						|
				},
 | 
						|
				{
 | 
						|
					SchedulerName: testSchedulerProfile3,
 | 
						|
					Plugins: &schedulerapi.Plugins{
 | 
						|
						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
						|
						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
 | 
						|
						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			waitSchedulingPods: []*v1.Pod{
 | 
						|
				st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(),
 | 
						|
				st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(),
 | 
						|
				st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile2).Obj(),
 | 
						|
				st.MakePod().Name("pod4").UID("pod4").SchedulerName(testSchedulerProfile3).Obj(),
 | 
						|
			},
 | 
						|
			expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3", "pod4"},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for _, tc := range cases {
 | 
						|
		t.Run(tc.name, func(t *testing.T) {
 | 
						|
			// Set up scheduler for the 3 nodes.
 | 
						|
			objs := append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...)
 | 
						|
			fakeClient := fake.NewSimpleClientset(objs...)
 | 
						|
			informerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
 | 
						|
			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: fakeClient.EventsV1()})
 | 
						|
			defer eventBroadcaster.Shutdown()
 | 
						|
			eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, fakePermit)
 | 
						|
 | 
						|
			outOfTreeRegistry := frameworkruntime.Registry{
 | 
						|
				fakePermit: newFakePermitPlugin(eventRecorder),
 | 
						|
			}
 | 
						|
 | 
						|
			_, ctx := ktesting.NewTestContext(t)
 | 
						|
			// timeout equals to the permit plugin waiting time.
 | 
						|
			ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
 | 
						|
			defer cancel()
 | 
						|
 | 
						|
			scheduler, err := New(
 | 
						|
				ctx,
 | 
						|
				fakeClient,
 | 
						|
				informerFactory,
 | 
						|
				nil,
 | 
						|
				profile.NewRecorderFactory(eventBroadcaster),
 | 
						|
				WithProfiles(tc.profiles...),
 | 
						|
				WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
 | 
						|
			)
 | 
						|
 | 
						|
			if err != nil {
 | 
						|
				t.Fatalf("Failed to create scheduler: %v", err)
 | 
						|
			}
 | 
						|
 | 
						|
			var wg sync.WaitGroup
 | 
						|
			waitSchedulingPodNumber := len(tc.waitSchedulingPods)
 | 
						|
			wg.Add(waitSchedulingPodNumber)
 | 
						|
			stopFn, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
 | 
						|
				e, ok := obj.(*eventsv1.Event)
 | 
						|
				if !ok || (e.Reason != podWaitingReason) {
 | 
						|
					return
 | 
						|
				}
 | 
						|
				wg.Done()
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				t.Fatal(err)
 | 
						|
			}
 | 
						|
			defer stopFn()
 | 
						|
 | 
						|
			// Run scheduler.
 | 
						|
			informerFactory.Start(ctx.Done())
 | 
						|
			informerFactory.WaitForCacheSync(ctx.Done())
 | 
						|
			go scheduler.Run(ctx)
 | 
						|
 | 
						|
			// Send pods to be scheduled.
 | 
						|
			for _, p := range tc.waitSchedulingPods {
 | 
						|
				_, err = fakeClient.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{})
 | 
						|
				if err != nil {
 | 
						|
					t.Fatal(err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			// Wait all pods in waitSchedulingPods to be scheduled.
 | 
						|
			wg.Wait()
 | 
						|
 | 
						|
			// When permit plugin emits the event, pod may not been added to the waitingPods pool yet, so we use pollUntil here.
 | 
						|
			if err := wait.PollUntilContextCancel(ctx, 100*time.Microsecond, true, func(context.Context) (done bool, err error) {
 | 
						|
				// Ensure that all waitingPods in scheduler can be obtained from any profiles.
 | 
						|
				for _, fwk := range scheduler.Profiles {
 | 
						|
					actualPodNamesInWaitingPods := sets.NewString()
 | 
						|
					fwk.IterateOverWaitingPods(func(pod framework.WaitingPod) {
 | 
						|
						actualPodNamesInWaitingPods.Insert(pod.GetPod().Name)
 | 
						|
					})
 | 
						|
					// Validate the name of pods in waitingPods matches expectations.
 | 
						|
					if actualPodNamesInWaitingPods.Len() != len(tc.expectPodNamesInWaitingPods) ||
 | 
						|
						!actualPodNamesInWaitingPods.HasAll(tc.expectPodNamesInWaitingPods...) {
 | 
						|
						return false, fmt.Errorf("Unexpected waitingPods in scheduler profile %s, expect: %#v, got: %#v", fwk.ProfileName(), tc.expectPodNamesInWaitingPods, actualPodNamesInWaitingPods.List())
 | 
						|
					}
 | 
						|
				}
 | 
						|
				return true, nil
 | 
						|
			}); err != nil {
 | 
						|
				t.Fatal("got unexpected result")
 | 
						|
			}
 | 
						|
		})
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
 | 
						|
 | 
						|
// fakeQueueSortPlugin is a no-op implementation for QueueSort extension point.
 | 
						|
type fakeQueueSortPlugin struct{}
 | 
						|
 | 
						|
func (pl *fakeQueueSortPlugin) Name() string {
 | 
						|
	return queueSort
 | 
						|
}
 | 
						|
 | 
						|
func (pl *fakeQueueSortPlugin) Less(_, _ *framework.QueuedPodInfo) bool {
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
var _ framework.BindPlugin = &fakebindPlugin{}
 | 
						|
 | 
						|
// fakebindPlugin is a no-op implementation for Bind extension point.
 | 
						|
type fakebindPlugin struct{}
 | 
						|
 | 
						|
func (t *fakebindPlugin) Name() string {
 | 
						|
	return fakeBind
 | 
						|
}
 | 
						|
 | 
						|
func (t *fakebindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// filterWithoutEnqueueExtensionsPlugin implements Filter, but doesn't implement EnqueueExtensions.
 | 
						|
type filterWithoutEnqueueExtensionsPlugin struct{}
 | 
						|
 | 
						|
func (*filterWithoutEnqueueExtensionsPlugin) Name() string { return filterWithoutEnqueueExtensions }
 | 
						|
 | 
						|
func (*filterWithoutEnqueueExtensionsPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
var hintFromFakeNode = framework.QueueingHint(100)
 | 
						|
 | 
						|
type fakeNodePlugin struct{}
 | 
						|
 | 
						|
var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
 | 
						|
	return hintFromFakeNode, nil
 | 
						|
}
 | 
						|
 | 
						|
func (*fakeNodePlugin) Name() string { return fakeNode }
 | 
						|
 | 
						|
func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint {
 | 
						|
	return []framework.ClusterEventWithHint{
 | 
						|
		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
var hintFromFakePod = framework.QueueingHint(101)
 | 
						|
 | 
						|
type fakePodPlugin struct{}
 | 
						|
 | 
						|
var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
 | 
						|
	return hintFromFakePod, nil
 | 
						|
}
 | 
						|
 | 
						|
func (*fakePodPlugin) Name() string { return fakePod }
 | 
						|
 | 
						|
func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
 | 
						|
	return []framework.ClusterEventWithHint{
 | 
						|
		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type emptyEventPlugin struct{}
 | 
						|
 | 
						|
func (*emptyEventPlugin) Name() string { return emptyEventExtensions }
 | 
						|
 | 
						|
func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister.
 | 
						|
// This can simulate a plugin registered at scheduler setup, but does nothing
 | 
						|
// due to some disabled feature gate.
 | 
						|
type emptyEventsToRegisterPlugin struct{}
 | 
						|
 | 
						|
func (*emptyEventsToRegisterPlugin) Name() string { return emptyEventsToRegister }
 | 
						|
 | 
						|
func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }
 | 
						|
 | 
						|
// fakePermitPlugin only implements PermitPlugin interface.
 | 
						|
type fakePermitPlugin struct {
 | 
						|
	eventRecorder events.EventRecorder
 | 
						|
}
 | 
						|
 | 
						|
func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory {
 | 
						|
	return func(ctx context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
 | 
						|
		pl := &fakePermitPlugin{
 | 
						|
			eventRecorder: eventRecorder,
 | 
						|
		}
 | 
						|
		return pl, nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (f fakePermitPlugin) Name() string {
 | 
						|
	return fakePermit
 | 
						|
}
 | 
						|
 | 
						|
const (
 | 
						|
	podWaitingReason = "podWaiting"
 | 
						|
)
 | 
						|
 | 
						|
func (f fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
 | 
						|
	defer func() {
 | 
						|
		// Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage.
 | 
						|
		f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "")
 | 
						|
	}()
 | 
						|
 | 
						|
	return framework.NewStatus(framework.Wait), 100 * time.Second
 | 
						|
}
 | 
						|
 | 
						|
var _ framework.PermitPlugin = &fakePermitPlugin{}
 |