mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			405 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			405 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2019 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package noderesources
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"strings"
 | 
						|
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	"k8s.io/kubernetes/pkg/api/v1/resource"
 | 
						|
	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/apis/config"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
 | 
						|
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
 | 
						|
)
 | 
						|
 | 
						|
var _ framework.PreFilterPlugin = &Fit{}
 | 
						|
var _ framework.FilterPlugin = &Fit{}
 | 
						|
var _ framework.EnqueueExtensions = &Fit{}
 | 
						|
var _ framework.PreScorePlugin = &Fit{}
 | 
						|
var _ framework.ScorePlugin = &Fit{}
 | 
						|
 | 
						|
const (
 | 
						|
	// Name is the name of the plugin used in the plugin registry and configurations.
 | 
						|
	Name = names.NodeResourcesFit
 | 
						|
 | 
						|
	// preFilterStateKey is the key in CycleState to NodeResourcesFit pre-computed data.
 | 
						|
	// Using the name of the plugin will likely help us avoid collisions with other plugins.
 | 
						|
	preFilterStateKey = "PreFilter" + Name
 | 
						|
 | 
						|
	// preScoreStateKey is the key in CycleState to NodeResourcesFit pre-computed data for Scoring.
 | 
						|
	preScoreStateKey = "PreScore" + Name
 | 
						|
)
 | 
						|
 | 
						|
// nodeResourceStrategyTypeMap maps strategy to scorer implementation
 | 
						|
var nodeResourceStrategyTypeMap = map[config.ScoringStrategyType]scorer{
 | 
						|
	config.LeastAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
 | 
						|
		resources := args.ScoringStrategy.Resources
 | 
						|
		return &resourceAllocationScorer{
 | 
						|
			Name:      string(config.LeastAllocated),
 | 
						|
			scorer:    leastResourceScorer(resources),
 | 
						|
			resources: resources,
 | 
						|
		}
 | 
						|
	},
 | 
						|
	config.MostAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
 | 
						|
		resources := args.ScoringStrategy.Resources
 | 
						|
		return &resourceAllocationScorer{
 | 
						|
			Name:      string(config.MostAllocated),
 | 
						|
			scorer:    mostResourceScorer(resources),
 | 
						|
			resources: resources,
 | 
						|
		}
 | 
						|
	},
 | 
						|
	config.RequestedToCapacityRatio: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
 | 
						|
		resources := args.ScoringStrategy.Resources
 | 
						|
		return &resourceAllocationScorer{
 | 
						|
			Name:      string(config.RequestedToCapacityRatio),
 | 
						|
			scorer:    requestedToCapacityRatioScorer(resources, args.ScoringStrategy.RequestedToCapacityRatio.Shape),
 | 
						|
			resources: resources,
 | 
						|
		}
 | 
						|
	},
 | 
						|
}
 | 
						|
 | 
						|
// Fit is a plugin that checks if a node has sufficient resources.
 | 
						|
type Fit struct {
 | 
						|
	ignoredResources                sets.Set[string]
 | 
						|
	ignoredResourceGroups           sets.Set[string]
 | 
						|
	enableInPlacePodVerticalScaling bool
 | 
						|
	enableSidecarContainers         bool
 | 
						|
	handle                          framework.Handle
 | 
						|
	resourceAllocationScorer
 | 
						|
}
 | 
						|
 | 
						|
// ScoreExtensions of the Score plugin.
 | 
						|
func (f *Fit) ScoreExtensions() framework.ScoreExtensions {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// preFilterState computed at PreFilter and used at Filter.
 | 
						|
type preFilterState struct {
 | 
						|
	framework.Resource
 | 
						|
}
 | 
						|
 | 
						|
// Clone the prefilter state.
 | 
						|
func (s *preFilterState) Clone() framework.StateData {
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
// preScoreState computed at PreScore and used at Score.
 | 
						|
type preScoreState struct {
 | 
						|
	// podRequests have the same order as the resources defined in NodeResourcesBalancedAllocationArgs.Resources,
 | 
						|
	// same for other place we store a list like that.
 | 
						|
	podRequests []int64
 | 
						|
}
 | 
						|
 | 
						|
// Clone implements the mandatory Clone interface. We don't really copy the data since
 | 
						|
// there is no need for that.
 | 
						|
func (s *preScoreState) Clone() framework.StateData {
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
// PreScore calculates incoming pod's resource requests and writes them to the cycle state used.
 | 
						|
func (f *Fit) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
 | 
						|
	state := &preScoreState{
 | 
						|
		podRequests: f.calculatePodResourceRequestList(pod, f.resources),
 | 
						|
	}
 | 
						|
	cycleState.Write(preScoreStateKey, state)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
 | 
						|
	c, err := cycleState.Read(preScoreStateKey)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("reading %q from cycleState: %w", preScoreStateKey, err)
 | 
						|
	}
 | 
						|
 | 
						|
	s, ok := c.(*preScoreState)
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("invalid PreScore state, got type %T", c)
 | 
						|
	}
 | 
						|
	return s, nil
 | 
						|
}
 | 
						|
 | 
						|
// Name returns name of the plugin. It is used in logs, etc.
 | 
						|
func (f *Fit) Name() string {
 | 
						|
	return Name
 | 
						|
}
 | 
						|
 | 
						|
// NewFit initializes a new plugin and returns it.
 | 
						|
func NewFit(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
 | 
						|
	args, ok := plArgs.(*config.NodeResourcesFitArgs)
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", plArgs)
 | 
						|
	}
 | 
						|
	if err := validation.ValidateNodeResourcesFitArgs(nil, args); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if args.ScoringStrategy == nil {
 | 
						|
		return nil, fmt.Errorf("scoring strategy not specified")
 | 
						|
	}
 | 
						|
 | 
						|
	strategy := args.ScoringStrategy.Type
 | 
						|
	scorePlugin, exists := nodeResourceStrategyTypeMap[strategy]
 | 
						|
	if !exists {
 | 
						|
		return nil, fmt.Errorf("scoring strategy %s is not supported", strategy)
 | 
						|
	}
 | 
						|
 | 
						|
	return &Fit{
 | 
						|
		ignoredResources:                sets.New(args.IgnoredResources...),
 | 
						|
		ignoredResourceGroups:           sets.New(args.IgnoredResourceGroups...),
 | 
						|
		enableInPlacePodVerticalScaling: fts.EnableInPlacePodVerticalScaling,
 | 
						|
		enableSidecarContainers:         fts.EnableSidecarContainers,
 | 
						|
		handle:                          h,
 | 
						|
		resourceAllocationScorer:        *scorePlugin(args),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// computePodResourceRequest returns a framework.Resource that covers the largest
 | 
						|
// width in each resource dimension. Because init-containers run sequentially, we collect
 | 
						|
// the max in each dimension iteratively. In contrast, we sum the resource vectors for
 | 
						|
// regular containers since they run simultaneously.
 | 
						|
//
 | 
						|
// # The resources defined for Overhead should be added to the calculated Resource request sum
 | 
						|
//
 | 
						|
// Example:
 | 
						|
//
 | 
						|
// Pod:
 | 
						|
//
 | 
						|
//	InitContainers
 | 
						|
//	  IC1:
 | 
						|
//	    CPU: 2
 | 
						|
//	    Memory: 1G
 | 
						|
//	  IC2:
 | 
						|
//	    CPU: 2
 | 
						|
//	    Memory: 3G
 | 
						|
//	Containers
 | 
						|
//	  C1:
 | 
						|
//	    CPU: 2
 | 
						|
//	    Memory: 1G
 | 
						|
//	  C2:
 | 
						|
//	    CPU: 1
 | 
						|
//	    Memory: 1G
 | 
						|
//
 | 
						|
// Result: CPU: 3, Memory: 3G
 | 
						|
func computePodResourceRequest(pod *v1.Pod) *preFilterState {
 | 
						|
	// pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled
 | 
						|
	reqs := resource.PodRequests(pod, resource.PodResourcesOptions{})
 | 
						|
	result := &preFilterState{}
 | 
						|
	result.SetMaxResource(reqs)
 | 
						|
	return result
 | 
						|
}
 | 
						|
 | 
						|
// PreFilter invoked at the prefilter extension point.
 | 
						|
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
 | 
						|
	if !f.enableSidecarContainers && hasRestartableInitContainer(pod) {
 | 
						|
		// Scheduler will calculate resources usage for a Pod containing
 | 
						|
		// restartable init containers that will be equal or more than kubelet will
 | 
						|
		// require to run the Pod. So there will be no overbooking. However, to
 | 
						|
		// avoid the inconsistency in resource calculation between the scheduler
 | 
						|
		// and the older (before v1.28) kubelet, make the Pod unschedulable.
 | 
						|
		return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "Pod has a restartable init container and the SidecarContainers feature is disabled")
 | 
						|
	}
 | 
						|
	cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
 | 
						|
	return nil, nil
 | 
						|
}
 | 
						|
 | 
						|
// PreFilterExtensions returns prefilter extensions, pod add and remove.
 | 
						|
func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
 | 
						|
	c, err := cycleState.Read(preFilterStateKey)
 | 
						|
	if err != nil {
 | 
						|
		// preFilterState doesn't exist, likely PreFilter wasn't invoked.
 | 
						|
		return nil, fmt.Errorf("error reading %q from cycleState: %w", preFilterStateKey, err)
 | 
						|
	}
 | 
						|
 | 
						|
	s, ok := c.(*preFilterState)
 | 
						|
	if !ok {
 | 
						|
		return nil, fmt.Errorf("%+v  convert to NodeResourcesFit.preFilterState error", c)
 | 
						|
	}
 | 
						|
	return s, nil
 | 
						|
}
 | 
						|
 | 
						|
// EventsToRegister returns the possible events that may make a Pod
 | 
						|
// failed by this plugin schedulable.
 | 
						|
func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint {
 | 
						|
	podActionType := framework.Delete
 | 
						|
	if f.enableInPlacePodVerticalScaling {
 | 
						|
		// If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered
 | 
						|
		// for this plugin since a Pod update may free up resources that make other Pods schedulable.
 | 
						|
		podActionType |= framework.Update
 | 
						|
	}
 | 
						|
	return []framework.ClusterEventWithHint{
 | 
						|
		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}},
 | 
						|
		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Filter invoked at the filter extension point.
 | 
						|
// Checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
 | 
						|
// It returns a list of insufficient resources, if empty, then the node has all the resources requested by the pod.
 | 
						|
func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
 | 
						|
	s, err := getPreFilterState(cycleState)
 | 
						|
	if err != nil {
 | 
						|
		return framework.AsStatus(err)
 | 
						|
	}
 | 
						|
 | 
						|
	insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources, f.ignoredResourceGroups)
 | 
						|
 | 
						|
	if len(insufficientResources) != 0 {
 | 
						|
		// We will keep all failure reasons.
 | 
						|
		failureReasons := make([]string, 0, len(insufficientResources))
 | 
						|
		for i := range insufficientResources {
 | 
						|
			failureReasons = append(failureReasons, insufficientResources[i].Reason)
 | 
						|
		}
 | 
						|
		return framework.NewStatus(framework.Unschedulable, failureReasons...)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func hasRestartableInitContainer(pod *v1.Pod) bool {
 | 
						|
	for _, c := range pod.Spec.InitContainers {
 | 
						|
		if c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// InsufficientResource describes what kind of resource limit is hit and caused the pod to not fit the node.
 | 
						|
type InsufficientResource struct {
 | 
						|
	ResourceName v1.ResourceName
 | 
						|
	// We explicitly have a parameter for reason to avoid formatting a message on the fly
 | 
						|
	// for common resources, which is expensive for cluster autoscaler simulations.
 | 
						|
	Reason    string
 | 
						|
	Requested int64
 | 
						|
	Used      int64
 | 
						|
	Capacity  int64
 | 
						|
}
 | 
						|
 | 
						|
// Fits checks if node have enough resources to host the pod.
 | 
						|
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) []InsufficientResource {
 | 
						|
	return fitsRequest(computePodResourceRequest(pod), nodeInfo, nil, nil)
 | 
						|
}
 | 
						|
 | 
						|
func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources, ignoredResourceGroups sets.Set[string]) []InsufficientResource {
 | 
						|
	insufficientResources := make([]InsufficientResource, 0, 4)
 | 
						|
 | 
						|
	allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
 | 
						|
	if len(nodeInfo.Pods)+1 > allowedPodNumber {
 | 
						|
		insufficientResources = append(insufficientResources, InsufficientResource{
 | 
						|
			ResourceName: v1.ResourcePods,
 | 
						|
			Reason:       "Too many pods",
 | 
						|
			Requested:    1,
 | 
						|
			Used:         int64(len(nodeInfo.Pods)),
 | 
						|
			Capacity:     int64(allowedPodNumber),
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	if podRequest.MilliCPU == 0 &&
 | 
						|
		podRequest.Memory == 0 &&
 | 
						|
		podRequest.EphemeralStorage == 0 &&
 | 
						|
		len(podRequest.ScalarResources) == 0 {
 | 
						|
		return insufficientResources
 | 
						|
	}
 | 
						|
 | 
						|
	if podRequest.MilliCPU > 0 && podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU-nodeInfo.Requested.MilliCPU) {
 | 
						|
		insufficientResources = append(insufficientResources, InsufficientResource{
 | 
						|
			ResourceName: v1.ResourceCPU,
 | 
						|
			Reason:       "Insufficient cpu",
 | 
						|
			Requested:    podRequest.MilliCPU,
 | 
						|
			Used:         nodeInfo.Requested.MilliCPU,
 | 
						|
			Capacity:     nodeInfo.Allocatable.MilliCPU,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	if podRequest.Memory > 0 && podRequest.Memory > (nodeInfo.Allocatable.Memory-nodeInfo.Requested.Memory) {
 | 
						|
		insufficientResources = append(insufficientResources, InsufficientResource{
 | 
						|
			ResourceName: v1.ResourceMemory,
 | 
						|
			Reason:       "Insufficient memory",
 | 
						|
			Requested:    podRequest.Memory,
 | 
						|
			Used:         nodeInfo.Requested.Memory,
 | 
						|
			Capacity:     nodeInfo.Allocatable.Memory,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	if podRequest.EphemeralStorage > 0 &&
 | 
						|
		podRequest.EphemeralStorage > (nodeInfo.Allocatable.EphemeralStorage-nodeInfo.Requested.EphemeralStorage) {
 | 
						|
		insufficientResources = append(insufficientResources, InsufficientResource{
 | 
						|
			ResourceName: v1.ResourceEphemeralStorage,
 | 
						|
			Reason:       "Insufficient ephemeral-storage",
 | 
						|
			Requested:    podRequest.EphemeralStorage,
 | 
						|
			Used:         nodeInfo.Requested.EphemeralStorage,
 | 
						|
			Capacity:     nodeInfo.Allocatable.EphemeralStorage,
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	for rName, rQuant := range podRequest.ScalarResources {
 | 
						|
		// Skip in case request quantity is zero
 | 
						|
		if rQuant == 0 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if v1helper.IsExtendedResourceName(rName) {
 | 
						|
			// If this resource is one of the extended resources that should be ignored, we will skip checking it.
 | 
						|
			// rName is guaranteed to have a slash due to API validation.
 | 
						|
			var rNamePrefix string
 | 
						|
			if ignoredResourceGroups.Len() > 0 {
 | 
						|
				rNamePrefix = strings.Split(string(rName), "/")[0]
 | 
						|
			}
 | 
						|
			if ignoredExtendedResources.Has(string(rName)) || ignoredResourceGroups.Has(rNamePrefix) {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] - nodeInfo.Requested.ScalarResources[rName]) {
 | 
						|
			insufficientResources = append(insufficientResources, InsufficientResource{
 | 
						|
				ResourceName: rName,
 | 
						|
				Reason:       fmt.Sprintf("Insufficient %v", rName),
 | 
						|
				Requested:    podRequest.ScalarResources[rName],
 | 
						|
				Used:         nodeInfo.Requested.ScalarResources[rName],
 | 
						|
				Capacity:     nodeInfo.Allocatable.ScalarResources[rName],
 | 
						|
			})
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return insufficientResources
 | 
						|
}
 | 
						|
 | 
						|
// Score invoked at the Score extension point.
 | 
						|
func (f *Fit) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
 | 
						|
	nodeInfo, err := f.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
 | 
						|
	if err != nil {
 | 
						|
		return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
 | 
						|
	}
 | 
						|
 | 
						|
	s, err := getPreScoreState(state)
 | 
						|
	if err != nil {
 | 
						|
		s = &preScoreState{
 | 
						|
			podRequests: f.calculatePodResourceRequestList(pod, f.resources),
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return f.score(ctx, pod, nodeInfo, s.podRequests)
 | 
						|
}
 |