mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #112637 from sanposhiho/pre-filter-skip
feature(scheduler): won't run Filter if PreFilter returned a Skip status
This commit is contained in:
		@@ -19,6 +19,8 @@ package framework
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
@@ -48,6 +50,8 @@ type CycleState struct {
 | 
			
		||||
	storage sync.Map
 | 
			
		||||
	// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
 | 
			
		||||
	recordPluginMetrics bool
 | 
			
		||||
	// SkipFilterPlugins are plugins that will be skipped in the Filter extension point.
 | 
			
		||||
	SkipFilterPlugins sets.String
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewCycleState initializes a new CycleState and returns its pointer.
 | 
			
		||||
@@ -83,6 +87,7 @@ func (c *CycleState) Clone() *CycleState {
 | 
			
		||||
		return true
 | 
			
		||||
	})
 | 
			
		||||
	copy.recordPluginMetrics = c.recordPluginMetrics
 | 
			
		||||
	copy.SkipFilterPlugins = c.SkipFilterPlugins
 | 
			
		||||
 | 
			
		||||
	return copy
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -91,6 +91,7 @@ const (
 | 
			
		||||
	// Wait is used when a Permit plugin finds a pod scheduling should wait.
 | 
			
		||||
	Wait
 | 
			
		||||
	// Skip is used when a Bind plugin chooses to skip binding.
 | 
			
		||||
	// Also, if a PreFilter plugin returns Skip, coupled Filter plugin will be skipped.
 | 
			
		||||
	Skip
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -599,8 +599,10 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
 | 
			
		||||
 | 
			
		||||
// RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns
 | 
			
		||||
// *Status and its code is set to non-success if any of the plugins returns
 | 
			
		||||
// anything but Success. If a non-success status is returned, then the scheduling
 | 
			
		||||
// cycle is aborted.
 | 
			
		||||
// anything but Success/Skip.
 | 
			
		||||
// Plugins that returned Skip status are recorded in the cyclestate,
 | 
			
		||||
// and they are skipped in the Filter extension point.
 | 
			
		||||
// If a non-success status is returned, then the scheduling cycle is aborted.
 | 
			
		||||
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
 | 
			
		||||
	startTime := time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
@@ -608,15 +610,19 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
 | 
			
		||||
	}()
 | 
			
		||||
	var result *framework.PreFilterResult
 | 
			
		||||
	var pluginsWithNodes []string
 | 
			
		||||
	skipPlugins := sets.NewString()
 | 
			
		||||
	for _, pl := range f.preFilterPlugins {
 | 
			
		||||
		r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
 | 
			
		||||
		if !s.IsSuccess() {
 | 
			
		||||
		if !s.IsSuccess() && !s.IsSkip() {
 | 
			
		||||
			s.SetFailedPlugin(pl.Name())
 | 
			
		||||
			if s.IsUnschedulable() {
 | 
			
		||||
				return nil, s
 | 
			
		||||
			}
 | 
			
		||||
			return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithFailedPlugin(pl.Name())
 | 
			
		||||
		}
 | 
			
		||||
		if s.IsSkip() {
 | 
			
		||||
			skipPlugins.Insert(pl.Name())
 | 
			
		||||
		}
 | 
			
		||||
		if !r.AllNodes() {
 | 
			
		||||
			pluginsWithNodes = append(pluginsWithNodes, pl.Name())
 | 
			
		||||
		}
 | 
			
		||||
@@ -628,8 +634,8 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
 | 
			
		||||
			}
 | 
			
		||||
			return nil, framework.NewStatus(framework.Unschedulable, msg)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
	state.SkipFilterPlugins = skipPlugins
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -723,8 +729,12 @@ func (f *frameworkImpl) RunFilterPlugins(
 | 
			
		||||
	pod *v1.Pod,
 | 
			
		||||
	nodeInfo *framework.NodeInfo,
 | 
			
		||||
) framework.PluginToStatus {
 | 
			
		||||
	skippedPlugins := state.SkipFilterPlugins
 | 
			
		||||
	statuses := make(framework.PluginToStatus)
 | 
			
		||||
	for _, pl := range f.filterPlugins {
 | 
			
		||||
		if skippedPlugins.Has(pl.Name()) {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
 | 
			
		||||
		if !pluginStatus.IsSuccess() {
 | 
			
		||||
			if !pluginStatus.IsUnschedulable() {
 | 
			
		||||
 
 | 
			
		||||
@@ -1393,7 +1393,7 @@ func TestPreFilterPlugins(t *testing.T) {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("Failed to create framework for testing: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		f.RunPreFilterPlugins(ctx, nil, nil)
 | 
			
		||||
		f.RunPreFilterPlugins(ctx, framework.NewCycleState(), nil)
 | 
			
		||||
		f.RunPreFilterExtensionAddPod(ctx, nil, nil, nil, nil)
 | 
			
		||||
		f.RunPreFilterExtensionRemovePod(ctx, nil, nil, nil, nil)
 | 
			
		||||
 | 
			
		||||
@@ -1412,40 +1412,152 @@ func TestPreFilterPlugins(t *testing.T) {
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRunPreFilterPluginsStatus(t *testing.T) {
 | 
			
		||||
	preFilter := &TestPlugin{
 | 
			
		||||
		name: preFilterPluginName,
 | 
			
		||||
		inj:  injectedResult{PreFilterStatus: int(framework.Error)},
 | 
			
		||||
func TestRunPreFilterPlugins(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name                string
 | 
			
		||||
		plugins             []*TestPlugin
 | 
			
		||||
		wantPreFilterResult *framework.PreFilterResult
 | 
			
		||||
		wantSkippedPlugins  sets.String
 | 
			
		||||
		wantStatus          *framework.Status
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "all PreFilter returned success",
 | 
			
		||||
			plugins: []*TestPlugin{
 | 
			
		||||
				{
 | 
			
		||||
					name: "success1",
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					name: "success2",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			wantPreFilterResult: nil,
 | 
			
		||||
			wantStatus:          nil,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "one PreFilter plugin returned success, but another PreFilter plugin returned non-success",
 | 
			
		||||
			plugins: []*TestPlugin{
 | 
			
		||||
				{
 | 
			
		||||
					name: "success",
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					name: "error",
 | 
			
		||||
					inj:  injectedResult{PreFilterStatus: int(framework.Error)},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			wantPreFilterResult: nil,
 | 
			
		||||
			wantStatus:          framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", "error", errInjectedStatus)).WithFailedPlugin("error"),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "one PreFilter plugin returned skip, but another PreFilter plugin returned non-success",
 | 
			
		||||
			plugins: []*TestPlugin{
 | 
			
		||||
				{
 | 
			
		||||
					name: "skip",
 | 
			
		||||
					inj:  injectedResult{PreFilterStatus: int(framework.Skip)},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					name: "error",
 | 
			
		||||
					inj:  injectedResult{PreFilterStatus: int(framework.Error)},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			wantPreFilterResult: nil,
 | 
			
		||||
			wantStatus:          framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", "error", errInjectedStatus)).WithFailedPlugin("error"),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "all PreFilter plugins returned skip",
 | 
			
		||||
			plugins: []*TestPlugin{
 | 
			
		||||
				{
 | 
			
		||||
					name: "skip1",
 | 
			
		||||
					inj:  injectedResult{PreFilterStatus: int(framework.Skip)},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					name: "skip2",
 | 
			
		||||
					inj:  injectedResult{PreFilterStatus: int(framework.Skip)},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					name: "skip3",
 | 
			
		||||
					inj:  injectedResult{PreFilterStatus: int(framework.Skip)},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			wantPreFilterResult: nil,
 | 
			
		||||
			wantSkippedPlugins:  sets.NewString("skip1", "skip2", "skip3"),
 | 
			
		||||
			wantStatus:          nil,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "some PreFilter plugins returned skip",
 | 
			
		||||
			plugins: []*TestPlugin{
 | 
			
		||||
				{
 | 
			
		||||
					name: "skip1",
 | 
			
		||||
					inj:  injectedResult{PreFilterStatus: int(framework.Skip)},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					name: "success1",
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					name: "skip2",
 | 
			
		||||
					inj:  injectedResult{PreFilterStatus: int(framework.Skip)},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					name: "success2",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			wantPreFilterResult: nil,
 | 
			
		||||
			wantSkippedPlugins:  sets.NewString("skip1", "skip2"),
 | 
			
		||||
			wantStatus:          nil,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	r := make(Registry)
 | 
			
		||||
	r.Register(preFilterPluginName,
 | 
			
		||||
		func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
 | 
			
		||||
			return preFilter, nil
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			r := make(Registry)
 | 
			
		||||
			enabled := make([]config.Plugin, len(tt.plugins))
 | 
			
		||||
			for i, p := range tt.plugins {
 | 
			
		||||
				p := p
 | 
			
		||||
				enabled[i].Name = p.name
 | 
			
		||||
				r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
 | 
			
		||||
					return p, nil
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
			defer cancel()
 | 
			
		||||
 | 
			
		||||
			f, err := newFrameworkWithQueueSortAndBind(
 | 
			
		||||
				r,
 | 
			
		||||
				config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
 | 
			
		||||
				ctx.Done(),
 | 
			
		||||
			)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("Failed to create framework for testing: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			state := framework.NewCycleState()
 | 
			
		||||
			result, status := f.RunPreFilterPlugins(ctx, state, nil)
 | 
			
		||||
			if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" {
 | 
			
		||||
				t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d)
 | 
			
		||||
			}
 | 
			
		||||
			if d := cmp.Diff(status, tt.wantStatus, cmp.Comparer(func(a, b *framework.Status) bool {
 | 
			
		||||
				if a.Code() == framework.Error && b.Code() == framework.Error {
 | 
			
		||||
					// we assume two error status is equal to each other if both contain the same reasons.
 | 
			
		||||
					return cmp.Equal(a.Reasons(), b.Reasons())
 | 
			
		||||
				}
 | 
			
		||||
				return a.Equal(b)
 | 
			
		||||
			})); d != "" {
 | 
			
		||||
				t.Errorf("wrong status. got: %v, want: %v, diff: %s", status, tt.wantStatus, d)
 | 
			
		||||
			}
 | 
			
		||||
			skipped := state.SkipFilterPlugins
 | 
			
		||||
			if d := cmp.Diff(skipped, tt.wantSkippedPlugins); d != "" {
 | 
			
		||||
				t.Errorf("wrong skip filter plugins. got: %v, want: %v, diff: %s", skipped, tt.wantSkippedPlugins, d)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterPluginName}}}}
 | 
			
		||||
 | 
			
		||||
	profile := config.KubeSchedulerProfile{Plugins: plugins}
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to create framework for testing: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	_, status := f.RunPreFilterPlugins(ctx, nil, nil)
 | 
			
		||||
	wantStatus := framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", preFilter.Name(), errInjectedStatus)).WithFailedPlugin(preFilter.Name())
 | 
			
		||||
	if !reflect.DeepEqual(status, wantStatus) {
 | 
			
		||||
		t.Errorf("wrong status. got: %v, want:%v", status, wantStatus)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFilterPlugins(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name          string
 | 
			
		||||
		plugins       []*TestPlugin
 | 
			
		||||
		wantStatus    *framework.Status
 | 
			
		||||
		wantStatusMap framework.PluginToStatus
 | 
			
		||||
		name           string
 | 
			
		||||
		plugins        []*TestPlugin
 | 
			
		||||
		skippedPlugins sets.String
 | 
			
		||||
		wantStatus     *framework.Status
 | 
			
		||||
		wantStatusMap  framework.PluginToStatus
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "SuccessFilter",
 | 
			
		||||
@@ -1455,7 +1567,6 @@ func TestFilterPlugins(t *testing.T) {
 | 
			
		||||
					inj:  injectedResult{FilterStatus: int(framework.Success)},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			wantStatus:    nil,
 | 
			
		||||
			wantStatusMap: framework.PluginToStatus{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
@@ -1533,6 +1644,23 @@ func TestFilterPlugins(t *testing.T) {
 | 
			
		||||
			wantStatus:    nil,
 | 
			
		||||
			wantStatusMap: framework.PluginToStatus{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "SuccessAndSkipFilters",
 | 
			
		||||
			plugins: []*TestPlugin{
 | 
			
		||||
				{
 | 
			
		||||
					name: "TestPlugin1",
 | 
			
		||||
					inj:  injectedResult{FilterStatus: int(framework.Success)},
 | 
			
		||||
				},
 | 
			
		||||
 | 
			
		||||
				{
 | 
			
		||||
					name: "TestPlugin2",
 | 
			
		||||
					inj:  injectedResult{FilterStatus: int(framework.Error)}, // To make sure this plugins isn't called, set error as an injected result.
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			wantStatus:     nil,
 | 
			
		||||
			skippedPlugins: sets.NewString("TestPlugin2"),
 | 
			
		||||
			wantStatusMap:  framework.PluginToStatus{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "ErrorAndSuccessFilters",
 | 
			
		||||
			plugins: []*TestPlugin{
 | 
			
		||||
@@ -1613,7 +1741,9 @@ func TestFilterPlugins(t *testing.T) {
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("fail to create framework: %s", err)
 | 
			
		||||
			}
 | 
			
		||||
			gotStatusMap := f.RunFilterPlugins(ctx, nil, pod, nil)
 | 
			
		||||
			state := framework.NewCycleState()
 | 
			
		||||
			state.SkipFilterPlugins = tt.skippedPlugins
 | 
			
		||||
			gotStatusMap := f.RunFilterPlugins(ctx, state, pod, nil)
 | 
			
		||||
			gotStatus := gotStatusMap.Merge()
 | 
			
		||||
			if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
 | 
			
		||||
				t.Errorf("wrong status code. got: %v, want:%v", gotStatus, tt.wantStatus)
 | 
			
		||||
@@ -1847,7 +1977,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
 | 
			
		||||
				t.Fatalf("fail to create framework: %s", err)
 | 
			
		||||
			}
 | 
			
		||||
			tt.nodeInfo.SetNode(tt.node)
 | 
			
		||||
			gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, nil, tt.pod, tt.nodeInfo)
 | 
			
		||||
			gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, framework.NewCycleState(), tt.pod, tt.nodeInfo)
 | 
			
		||||
			if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
 | 
			
		||||
				t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus)
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user