mirror of
https://github.com/outbackdingo/kubernetes.git
synced 2026-01-28 02:19:27 +00:00
1734 lines
69 KiB
Go
1734 lines
69 KiB
Go
/*
|
|
Copyright 2022 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package dynamicresources
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
resourceapi "k8s.io/api/resource/v1"
|
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/diff"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/uuid"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/util/retry"
|
|
resourcehelper "k8s.io/component-helpers/resource"
|
|
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
|
"k8s.io/dynamic-resource-allocation/cel"
|
|
"k8s.io/dynamic-resource-allocation/resourceclaim"
|
|
"k8s.io/dynamic-resource-allocation/structured"
|
|
"k8s.io/klog/v2"
|
|
fwk "k8s.io/kube-scheduler/framework"
|
|
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/extended"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
|
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
|
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
|
|
"k8s.io/kubernetes/pkg/util/slice"
|
|
"k8s.io/utils/ptr"
|
|
)
|
|
|
|
const (
|
|
// Name is the name of the plugin used in Registry and configurations.
|
|
Name = names.DynamicResources
|
|
|
|
stateKey fwk.StateKey = Name
|
|
|
|
// specialClaimInMemName is the name of the special resource claim that
|
|
// exists only in memory. The claim will get a generated name when it is
|
|
// written to API server.
|
|
//
|
|
// It's intentionally not a valid ResourceClaim name to avoid conflicts with
|
|
// some actual ResourceClaim in the apiserver.
|
|
specialClaimInMemName = "<extended-resources>"
|
|
|
|
// BindingTimeoutDefaultSeconds is the default timeout for waiting for
|
|
// BindingConditions to be ready.
|
|
BindingTimeoutDefaultSeconds = 600
|
|
|
|
// AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting
|
|
// for the extended resource claim to be updated in assumed cache.
|
|
AssumeExtendedResourceTimeoutDefaultSeconds = 120
|
|
)
|
|
|
|
// The state is initialized in PreFilter phase. Because we save the pointer in
|
|
// fwk.CycleState, in the later phases we don't need to call Write method
|
|
// to update the value
|
|
type stateData struct {
|
|
// A copy of all claims for the Pod (i.e. 1:1 match with
|
|
// pod.Spec.ResourceClaims), initially with the status from the start
|
|
// of the scheduling cycle. Each claim instance is read-only because it
|
|
// might come from the informer cache. The instances get replaced when
|
|
// the plugin itself successfully does an Update.
|
|
//
|
|
// When the DRAExtendedResource feature is enabled, the special ResourceClaim
|
|
// which represents extended resource requests is also stored here.
|
|
// The plugin code should treat this field as a black box and only
|
|
// access it via its methods, in particular:
|
|
// - The all method can be used to iterate over all claims, including
|
|
// the special one.
|
|
// - The allUserClaims method excludes the special one.
|
|
//
|
|
// Empty if the Pod has no claims and no special claim for extended
|
|
// resources backed by DRA, in which case the plugin has no work to do for
|
|
// the Pod.
|
|
claims claimStore
|
|
|
|
// draExtendedResource stores data for extended resources backed by DRA.
|
|
// It will remain empty when the DRAExtendedResource feature is disabled.
|
|
draExtendedResource draExtendedResource
|
|
|
|
// Allocator handles claims with structured parameters, which is all of them nowadays.
|
|
allocator structured.Allocator
|
|
|
|
// mutex must be locked while accessing any of the fields below.
|
|
mutex sync.Mutex
|
|
|
|
// The indices of all claims that:
|
|
// - are allocated
|
|
// - were not available on at least one node
|
|
//
|
|
// Set in parallel during Filter, so write access there must be
|
|
// protected by the mutex. Used by PostFilter.
|
|
unavailableClaims sets.Set[int]
|
|
|
|
// informationsForClaim has one entry for each claim in claims.
|
|
informationsForClaim []informationForClaim
|
|
|
|
// nodeAllocations caches the result of Filter for the nodes, its key is node name.
|
|
nodeAllocations map[string]nodeAllocation
|
|
}
|
|
|
|
func (d *stateData) Clone() fwk.StateData {
|
|
return d
|
|
}
|
|
|
|
// draExtendedResource stores data for extended resources backed by DRA.
|
|
// It will remain empty when the DRAExtendedResource feature is disabled.
|
|
type draExtendedResource struct {
|
|
// May have extended resource backed by DRA.
|
|
podScalarResources map[v1.ResourceName]int64
|
|
// The mapping of extended resource to device class name
|
|
resourceToDeviceClass map[v1.ResourceName]string
|
|
}
|
|
|
|
type informationForClaim struct {
|
|
// Node selector based on the claim status if allocated.
|
|
availableOnNodes *nodeaffinity.NodeSelector
|
|
|
|
// Set by Reserved, published by PreBind, empty if nothing had to be allocated.
|
|
allocation *resourceapi.AllocationResult
|
|
}
|
|
|
|
// nodeAllocation holds the the allocation results and extended resource claim per node.
|
|
type nodeAllocation struct {
|
|
// allocationResults has the allocation results, matching the order of
|
|
// claims which had to be allocated.
|
|
allocationResults []resourceapi.AllocationResult
|
|
// extendedResourceClaim has the special claim for extended resource backed by DRA
|
|
// created during Filter for the nodes.
|
|
extendedResourceClaim *resourceapi.ResourceClaim
|
|
}
|
|
|
|
// DynamicResources is a plugin that ensures that ResourceClaims are allocated.
|
|
type DynamicResources struct {
|
|
enabled bool
|
|
enableAdminAccess bool
|
|
enablePrioritizedList bool
|
|
enableSchedulingQueueHint bool
|
|
enablePartitionableDevices bool
|
|
enableDeviceTaints bool
|
|
enableDeviceBindingConditions bool
|
|
enableDeviceStatus bool
|
|
enableExtendedResource bool
|
|
enableFilterTimeout bool
|
|
filterTimeout time.Duration
|
|
enableConsumableCapacity bool
|
|
|
|
fh framework.Handle
|
|
clientset kubernetes.Interface
|
|
celCache *cel.Cache
|
|
draManager framework.SharedDRAManager
|
|
}
|
|
|
|
// New initializes a new plugin and returns it.
|
|
func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
|
if !fts.EnableDynamicResourceAllocation {
|
|
// Disabled, won't do anything.
|
|
return &DynamicResources{}, nil
|
|
}
|
|
|
|
args, ok := plArgs.(*config.DynamicResourcesArgs)
|
|
if !ok {
|
|
return nil, fmt.Errorf("got args of type %T, want *DynamicResourcesArgs", plArgs)
|
|
}
|
|
if err := validation.ValidateDynamicResourcesArgs(nil, args, fts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pl := &DynamicResources{
|
|
enabled: true,
|
|
enableAdminAccess: fts.EnableDRAAdminAccess,
|
|
enableDeviceTaints: fts.EnableDRADeviceTaints,
|
|
enablePrioritizedList: fts.EnableDRAPrioritizedList,
|
|
enableFilterTimeout: fts.EnableDRASchedulerFilterTimeout,
|
|
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
|
|
enablePartitionableDevices: fts.EnablePartitionableDevices,
|
|
enableExtendedResource: fts.EnableDRAExtendedResource,
|
|
enableConsumableCapacity: fts.EnableConsumableCapacity,
|
|
filterTimeout: ptr.Deref(args.FilterTimeout, metav1.Duration{}).Duration,
|
|
enableDeviceBindingConditions: fts.EnableDRADeviceBindingConditions,
|
|
enableDeviceStatus: fts.EnableDRAResourceClaimDeviceStatus,
|
|
|
|
fh: fh,
|
|
clientset: fh.ClientSet(),
|
|
// This is a LRU cache for compiled CEL expressions. The most
|
|
// recent 10 of them get reused across different scheduling
|
|
// cycles.
|
|
celCache: cel.NewCache(10, cel.Features{EnableConsumableCapacity: fts.EnableConsumableCapacity}),
|
|
draManager: fh.SharedDRAManager(),
|
|
}
|
|
|
|
return pl, nil
|
|
}
|
|
|
|
var _ framework.PreEnqueuePlugin = &DynamicResources{}
|
|
var _ framework.PreFilterPlugin = &DynamicResources{}
|
|
var _ framework.FilterPlugin = &DynamicResources{}
|
|
var _ framework.PostFilterPlugin = &DynamicResources{}
|
|
var _ framework.ReservePlugin = &DynamicResources{}
|
|
var _ framework.EnqueueExtensions = &DynamicResources{}
|
|
var _ framework.PreBindPlugin = &DynamicResources{}
|
|
|
|
// Name returns name of the plugin. It is used in logs, etc.
|
|
func (pl *DynamicResources) Name() string {
|
|
return Name
|
|
}
|
|
|
|
// EventsToRegister returns the possible events that may make a Pod
|
|
// failed by this plugin schedulable.
|
|
func (pl *DynamicResources) EventsToRegister(_ context.Context) ([]fwk.ClusterEventWithHint, error) {
|
|
if !pl.enabled {
|
|
return nil, nil
|
|
}
|
|
// A resource might depend on node labels for topology filtering.
|
|
// A new or updated node may make pods schedulable.
|
|
//
|
|
// A note about UpdateNodeTaint event:
|
|
// Ideally, it's supposed to register only Add | UpdateNodeLabel because UpdateNodeTaint will never change the result from this plugin.
|
|
// But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add.
|
|
// See: https://github.com/kubernetes/kubernetes/issues/109437
|
|
nodeActionType := fwk.Add | fwk.UpdateNodeLabel | fwk.UpdateNodeTaint | fwk.UpdateNodeAllocatable
|
|
if pl.enableSchedulingQueueHint {
|
|
// When QHint is enabled, the problematic preCheck is already removed, and we can remove UpdateNodeTaint.
|
|
nodeActionType = fwk.Add | fwk.UpdateNodeLabel | fwk.UpdateNodeAllocatable
|
|
}
|
|
|
|
events := []fwk.ClusterEventWithHint{
|
|
{Event: fwk.ClusterEvent{Resource: fwk.Node, ActionType: nodeActionType}},
|
|
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
|
|
{Event: fwk.ClusterEvent{Resource: fwk.ResourceClaim, ActionType: fwk.Add | fwk.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
|
|
// Adding the ResourceClaim name to the pod status makes pods waiting for their ResourceClaim schedulable.
|
|
{Event: fwk.ClusterEvent{Resource: fwk.Pod, ActionType: fwk.UpdatePodGeneratedResourceClaim}, QueueingHintFn: pl.isSchedulableAfterPodChange},
|
|
// A pod might be waiting for a class to get created or modified.
|
|
{Event: fwk.ClusterEvent{Resource: fwk.DeviceClass, ActionType: fwk.Add | fwk.Update}},
|
|
// Adding or updating a ResourceSlice might make a pod schedulable because new resources became available.
|
|
{Event: fwk.ClusterEvent{Resource: fwk.ResourceSlice, ActionType: fwk.Add | fwk.Update}},
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// PreEnqueue checks if there are known reasons why a pod currently cannot be
|
|
// scheduled. When this fails, one of the registered events can trigger another
|
|
// attempt.
|
|
func (pl *DynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status *fwk.Status) {
|
|
if !pl.enabled {
|
|
return nil
|
|
}
|
|
|
|
if err := pl.foreachPodResourceClaim(pod, nil); err != nil {
|
|
return statusUnschedulable(klog.FromContext(ctx), err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// isSchedulableAfterClaimChange is invoked for add and update claim events reported by
|
|
// an informer. It checks whether that change made a previously unschedulable
|
|
// pod schedulable. It errs on the side of letting a pod scheduling attempt
|
|
// happen. The delete claim event will not invoke it, so newObj will never be nil.
|
|
func (pl *DynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (fwk.QueueingHint, error) {
|
|
originalClaim, modifiedClaim, err := schedutil.As[*resourceapi.ResourceClaim](oldObj, newObj)
|
|
if err != nil {
|
|
// Shouldn't happen.
|
|
return fwk.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimChange: %w", err)
|
|
}
|
|
|
|
usesClaim := false
|
|
if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourceapi.ResourceClaim) {
|
|
if claim.UID == modifiedClaim.UID {
|
|
usesClaim = true
|
|
}
|
|
}); err != nil {
|
|
// This is not an unexpected error: we know that
|
|
// foreachPodResourceClaim only returns errors for "not
|
|
// schedulable".
|
|
if loggerV := logger.V(6); loggerV.Enabled() {
|
|
owner := metav1.GetControllerOf(modifiedClaim)
|
|
loggerV.Info("pod is not schedulable after resource claim change", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "claimOwner", owner, "reason", err.Error())
|
|
}
|
|
return fwk.QueueSkip, nil
|
|
}
|
|
|
|
if originalClaim != nil &&
|
|
originalClaim.Status.Allocation != nil &&
|
|
modifiedClaim.Status.Allocation == nil {
|
|
// A claim with structured parameters was deallocated. This might have made
|
|
// resources available for other pods.
|
|
logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
|
return fwk.Queue, nil
|
|
}
|
|
|
|
if !usesClaim {
|
|
// This was not the claim the pod was waiting for.
|
|
logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
|
return fwk.QueueSkip, nil
|
|
}
|
|
|
|
if originalClaim == nil {
|
|
logger.V(5).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
|
return fwk.Queue, nil
|
|
}
|
|
|
|
// Modifications may or may not be relevant. If the entire
|
|
// status is as before, then something else must have changed
|
|
// and we don't care. What happens in practice is that the
|
|
// resource driver adds the finalizer.
|
|
if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) {
|
|
if loggerV := logger.V(7); loggerV.Enabled() {
|
|
// Log more information.
|
|
loggerV.Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "diff", diff.Diff(originalClaim, modifiedClaim))
|
|
} else {
|
|
logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
|
}
|
|
return fwk.QueueSkip, nil
|
|
}
|
|
|
|
logger.V(5).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
|
return fwk.Queue, nil
|
|
}
|
|
|
|
// isSchedulableAfterPodChange is invoked for update pod events reported by
|
|
// an informer. It checks whether that change adds the ResourceClaim(s) that the
|
|
// pod has been waiting for.
|
|
func (pl *DynamicResources) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (fwk.QueueingHint, error) {
|
|
_, modifiedPod, err := schedutil.As[*v1.Pod](nil, newObj)
|
|
if err != nil {
|
|
// Shouldn't happen.
|
|
return fwk.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimChange: %w", err)
|
|
}
|
|
|
|
if pod.UID != modifiedPod.UID {
|
|
logger.V(7).Info("pod is not schedulable after change in other pod", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
|
|
return fwk.QueueSkip, nil
|
|
}
|
|
|
|
if err := pl.foreachPodResourceClaim(modifiedPod, nil); err != nil {
|
|
// This is not an unexpected error: we know that
|
|
// foreachPodResourceClaim only returns errors for "not
|
|
// schedulable".
|
|
logger.V(6).Info("pod is not schedulable after being updated", "pod", klog.KObj(pod))
|
|
return fwk.QueueSkip, nil
|
|
}
|
|
|
|
logger.V(5).Info("pod got updated and is schedulable", "pod", klog.KObj(pod))
|
|
return fwk.Queue, nil
|
|
}
|
|
|
|
// podResourceClaims returns the ResourceClaims for all pod.Spec.PodResourceClaims.
|
|
func (pl *DynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourceapi.ResourceClaim, error) {
|
|
claims := make([]*resourceapi.ResourceClaim, 0, len(pod.Spec.ResourceClaims))
|
|
if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourceapi.ResourceClaim) {
|
|
// We store the pointer as returned by the lister. The
|
|
// assumption is that if a claim gets modified while our code
|
|
// runs, the cache will store a new pointer, not mutate the
|
|
// existing object that we point to here.
|
|
claims = append(claims, claim)
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
return claims, nil
|
|
}
|
|
|
|
// foreachPodResourceClaim checks that each ResourceClaim for the pod exists.
|
|
// It calls an optional handler for those claims that it finds.
|
|
func (pl *DynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourceapi.ResourceClaim)) error {
|
|
for _, resource := range pod.Spec.ResourceClaims {
|
|
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// The claim name might be nil if no underlying resource claim
|
|
// was generated for the referenced claim. There are valid use
|
|
// cases when this might happen, so we simply skip it.
|
|
if claimName == nil {
|
|
continue
|
|
}
|
|
claim, err := pl.draManager.ResourceClaims().Get(pod.Namespace, *claimName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if claim.DeletionTimestamp != nil {
|
|
return fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
|
|
}
|
|
|
|
if mustCheckOwner {
|
|
if err := resourceclaim.IsForPod(pod, claim); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if cb != nil {
|
|
cb(resource.Name, claim)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// hasDeviceClassMappedExtendedResource returns true when the given resource list has an extended resource, that has
|
|
// a mapping to a device class.
|
|
func hasDeviceClassMappedExtendedResource(reqs v1.ResourceList, deviceClassMapping map[v1.ResourceName]string) bool {
|
|
for rName, rValue := range reqs {
|
|
if rValue.IsZero() {
|
|
// We only care about the resources requested by the pod we are trying to schedule.
|
|
continue
|
|
}
|
|
if v1helper.IsExtendedResourceName(rName) {
|
|
_, ok := deviceClassMapping[rName]
|
|
if ok {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// findExtendedResourceClaim looks for the extended resource claim, i.e., the claim with special annotation
|
|
// set to "true", and with the pod as owner. It must be called with all ResourceClaims in the cluster.
|
|
// The returned ResourceClaim is read-only.
|
|
func findExtendedResourceClaim(pod *v1.Pod, resourceClaims []*resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
|
|
for _, c := range resourceClaims {
|
|
if c.Annotations[resourceapi.ExtendedResourceClaimAnnotation] == "true" {
|
|
for _, or := range c.OwnerReferences {
|
|
if or.Name == pod.Name && *or.Controller && or.UID == pod.UID {
|
|
return c
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// preFilterExtendedResources checks if there is any extended resource in the
|
|
// pod requests that has a device class mapping, i.e., there is a device class
|
|
// that has spec.ExtendedResourceName or its implicit extended resource name
|
|
// matching the given extended resource in that pod requests.
|
|
//
|
|
// It looks for the special resource claim for the pod created from prior scheduling
|
|
// cycle. If not found, it creates the special claim with no Requests in the Spec,
|
|
// with a temporary UID, and the specialClaimInMemName name.
|
|
// Either way, the special claim is stored in state.claims.
|
|
//
|
|
// In addition, draExtendedResource is also stored in the cycle state.
|
|
//
|
|
// It returns the special ResourceClaim and an error status. It returns nil for both
|
|
// if the feature is disabled or not required for the Pod.
|
|
func (pl *DynamicResources) preFilterExtendedResources(pod *v1.Pod, logger klog.Logger, s *stateData) (*resourceapi.ResourceClaim, *fwk.Status) {
|
|
if !pl.enableExtendedResource {
|
|
return nil, nil
|
|
}
|
|
|
|
deviceClassMapping, err := extended.DeviceClassMapping(pl.draManager)
|
|
if err != nil {
|
|
return nil, statusError(logger, err, "retrieving extended resource to DeviceClass mapping")
|
|
}
|
|
|
|
reqs := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{})
|
|
hasExtendedResource := hasDeviceClassMappedExtendedResource(reqs, deviceClassMapping)
|
|
if !hasExtendedResource {
|
|
return nil, nil
|
|
}
|
|
|
|
s.draExtendedResource.resourceToDeviceClass = deviceClassMapping
|
|
r := framework.NewResource(reqs)
|
|
s.draExtendedResource.podScalarResources = r.ScalarResources
|
|
|
|
resourceClaims, err := pl.draManager.ResourceClaims().List()
|
|
if err != nil {
|
|
return nil, statusError(logger, err, "listing ResourceClaims")
|
|
}
|
|
|
|
// Check if the special resource claim has been created from prior scheduling cycle.
|
|
//
|
|
// If it was already allocated earlier, that allocation might not be valid anymore.
|
|
// We could try to check that, but it depends on various factors that are difficult to
|
|
// cover (basically needs to replicate allocator logic) and if it turns out that the
|
|
// allocation is stale, we would have to schedule with those allocated devices not
|
|
// available for a new allocation. This situation should be rare (= binding failure),
|
|
// so we solve it via brute-force
|
|
// - Kick off deallocation in the background.
|
|
// - Mark the pod as unschedulable. Successful deallocation will make it schedulable again.
|
|
extendedResourceClaim := findExtendedResourceClaim(pod, resourceClaims)
|
|
if extendedResourceClaim == nil {
|
|
// Create one special claim for all extended resources backed by DRA in the Pod.
|
|
// Create the ResourceClaim with pod as owner, with a generated name that uses
|
|
// <pod name>-extended-resources- as base. The final name will get truncated if it
|
|
// would be too long.
|
|
extendedResourceClaim = &resourceapi.ResourceClaim{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: pod.Namespace,
|
|
Name: specialClaimInMemName,
|
|
// fake temporary UID for use in SignalClaimPendingAllocation
|
|
UID: types.UID(uuid.NewUUID()),
|
|
GenerateName: pod.Name + "-extended-resources-",
|
|
OwnerReferences: []metav1.OwnerReference{
|
|
{
|
|
APIVersion: "v1",
|
|
Kind: "Pod",
|
|
Name: pod.Name,
|
|
UID: pod.UID,
|
|
Controller: ptr.To(true),
|
|
BlockOwnerDeletion: ptr.To(true),
|
|
},
|
|
},
|
|
Annotations: map[string]string{
|
|
resourceapi.ExtendedResourceClaimAnnotation: "true",
|
|
},
|
|
},
|
|
Spec: resourceapi.ResourceClaimSpec{},
|
|
}
|
|
}
|
|
return extendedResourceClaim, nil
|
|
}
|
|
|
|
// PreFilter invoked at the prefilter extension point to check if pod has all
|
|
// immediate claims bound. UnschedulableAndUnresolvable is returned if
|
|
// the pod cannot be scheduled at the moment on any node.
|
|
func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*framework.PreFilterResult, *fwk.Status) {
|
|
if !pl.enabled {
|
|
return nil, fwk.NewStatus(fwk.Skip)
|
|
}
|
|
logger := klog.FromContext(ctx)
|
|
|
|
// If the pod does not reference any claim, we don't need to do
|
|
// anything for it. We just initialize an empty state to record that
|
|
// observation for the other functions. This gets updated below
|
|
// if we get that far.
|
|
s := &stateData{}
|
|
state.Write(stateKey, s)
|
|
|
|
userClaims, err := pl.podResourceClaims(pod)
|
|
if err != nil {
|
|
return nil, statusUnschedulable(logger, err.Error())
|
|
}
|
|
logger.V(5).Info("pod resource claims", "pod", klog.KObj(pod), "resourceclaims", klog.KObjSlice(userClaims))
|
|
extendedResourceClaim, status := pl.preFilterExtendedResources(pod, logger, s)
|
|
if status != nil {
|
|
return nil, status
|
|
}
|
|
claims := newClaimStore(userClaims, extendedResourceClaim)
|
|
|
|
// This check covers user and extended ResourceClaim.
|
|
if claims.empty() {
|
|
return nil, fwk.NewStatus(fwk.Skip)
|
|
}
|
|
|
|
// Counts all claims which the scheduler needs to allocate itself.
|
|
numClaimsToAllocate := 0
|
|
s.informationsForClaim = make([]informationForClaim, claims.len())
|
|
for index, claim := range claims.all() {
|
|
if claim.Status.Allocation != nil &&
|
|
!resourceclaim.CanBeReserved(claim) &&
|
|
!resourceclaim.IsReservedForPod(pod, claim) {
|
|
// Resource is in use. The pod has to wait.
|
|
return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
|
|
}
|
|
|
|
if claim.Status.Allocation != nil {
|
|
if claim.Status.Allocation.NodeSelector != nil {
|
|
nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.NodeSelector)
|
|
if err != nil {
|
|
return nil, statusError(logger, err)
|
|
}
|
|
s.informationsForClaim[index].availableOnNodes = nodeSelector
|
|
}
|
|
} else {
|
|
numClaimsToAllocate++
|
|
|
|
// Allocation in flight? Better wait for that
|
|
// to finish, see inFlightAllocations
|
|
// documentation for details.
|
|
if pl.draManager.ResourceClaims().ClaimHasPendingAllocation(claim.UID) {
|
|
return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim)))
|
|
}
|
|
|
|
// Continue without validating the special claim for extended resource backed by DRA.
|
|
// For the claim template, it is not allocated yet at this point, and it does not have a spec.
|
|
// For the claim from prior scheduling cycle, leave it to Filter Phase to validate.
|
|
if claim == extendedResourceClaim {
|
|
continue
|
|
}
|
|
|
|
// Check all requests and device classes. If a class
|
|
// does not exist, scheduling cannot proceed, no matter
|
|
// how the claim is being allocated.
|
|
//
|
|
// When using a control plane controller, a class might
|
|
// have a node filter. This is useful for trimming the
|
|
// initial set of potential nodes before we ask the
|
|
// driver(s) for information about the specific pod.
|
|
for _, request := range claim.Spec.Devices.Requests {
|
|
// The requirements differ depending on whether the request has a list of
|
|
// alternative subrequests defined in the firstAvailable field.
|
|
switch {
|
|
case request.Exactly != nil:
|
|
if status := pl.validateDeviceClass(logger, request.Exactly.DeviceClassName, request.Name); status != nil {
|
|
return nil, status
|
|
}
|
|
case len(request.FirstAvailable) > 0:
|
|
if !pl.enablePrioritizedList {
|
|
return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s, request %s: has subrequests, but the DRAPrioritizedList feature is disabled", klog.KObj(claim), request.Name))
|
|
}
|
|
for _, subRequest := range request.FirstAvailable {
|
|
qualRequestName := strings.Join([]string{request.Name, subRequest.Name}, "/")
|
|
if status := pl.validateDeviceClass(logger, subRequest.DeviceClassName, qualRequestName); status != nil {
|
|
return nil, status
|
|
}
|
|
}
|
|
default:
|
|
return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s, request %s: unknown request type", klog.KObj(claim), request.Name))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if numClaimsToAllocate > 0 {
|
|
if loggerV := logger.V(5); loggerV.Enabled() {
|
|
claimsToAllocate := make([]*resourceapi.ResourceClaim, 0, claims.len())
|
|
for _, claim := range claims.toAllocate() {
|
|
claimsToAllocate = append(claimsToAllocate, claim)
|
|
}
|
|
loggerV.Info("Preparing allocation with structured parameters", "pod", klog.KObj(pod), "resourceclaims", klog.KObjSlice(claimsToAllocate))
|
|
}
|
|
|
|
// Doing this over and over again for each pod could be avoided
|
|
// by setting the allocator up once and then keeping it up-to-date
|
|
// as changes are observed.
|
|
//
|
|
// But that would cause problems for using the plugin in the
|
|
// Cluster Autoscaler. If this step here turns out to be
|
|
// expensive, we may have to maintain and update state more
|
|
// persistently.
|
|
//
|
|
// Claims (and thus their devices) are treated as "allocated" if they are in the assume cache
|
|
// or currently their allocation is in-flight. This does not change
|
|
// during filtering, so we can determine that once.
|
|
var allocatedState *structured.AllocatedState
|
|
if pl.enableConsumableCapacity {
|
|
allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState()
|
|
if err != nil {
|
|
return nil, statusError(logger, err)
|
|
}
|
|
if allocatedState == nil {
|
|
return nil, statusError(logger, errors.New("nil allocated state"))
|
|
}
|
|
} else {
|
|
allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
|
|
if err != nil {
|
|
return nil, statusError(logger, err)
|
|
}
|
|
allocatedState = &structured.AllocatedState{
|
|
AllocatedDevices: allocatedDevices,
|
|
AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](),
|
|
AggregatedCapacity: structured.NewConsumedCapacityCollection(),
|
|
}
|
|
}
|
|
slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules()
|
|
if err != nil {
|
|
return nil, statusError(logger, err)
|
|
}
|
|
features := structured.Features{
|
|
AdminAccess: pl.enableAdminAccess,
|
|
PrioritizedList: pl.enablePrioritizedList,
|
|
PartitionableDevices: pl.enablePartitionableDevices,
|
|
DeviceTaints: pl.enableDeviceTaints,
|
|
DeviceBinding: pl.enableDeviceBindingConditions,
|
|
DeviceStatus: pl.enableDeviceStatus,
|
|
ConsumableCapacity: pl.enableConsumableCapacity,
|
|
}
|
|
allocator, err := structured.NewAllocator(ctx, features, *allocatedState, pl.draManager.DeviceClasses(), slices, pl.celCache)
|
|
if err != nil {
|
|
return nil, statusError(logger, err)
|
|
}
|
|
s.allocator = allocator
|
|
s.nodeAllocations = make(map[string]nodeAllocation)
|
|
}
|
|
s.claims = claims
|
|
return nil, nil
|
|
}
|
|
|
|
func (pl *DynamicResources) validateDeviceClass(logger klog.Logger, deviceClassName, requestName string) *fwk.Status {
|
|
if deviceClassName == "" {
|
|
return statusError(logger, fmt.Errorf("request %s: unsupported request type", requestName))
|
|
}
|
|
|
|
_, err := pl.draManager.DeviceClasses().Get(deviceClassName)
|
|
if err != nil {
|
|
// If the class cannot be retrieved, allocation cannot proceed.
|
|
if apierrors.IsNotFound(err) {
|
|
// Here we mark the pod as "unschedulable", so it'll sleep in
|
|
// the unscheduleable queue until a DeviceClass event occurs.
|
|
return statusUnschedulable(logger, fmt.Sprintf("request %s: device class %s does not exist", requestName, deviceClassName))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PreFilterExtensions returns prefilter extensions, pod add and remove.
|
|
func (pl *DynamicResources) PreFilterExtensions() framework.PreFilterExtensions {
|
|
return nil
|
|
}
|
|
|
|
func getStateData(cs fwk.CycleState) (*stateData, error) {
|
|
state, err := cs.Read(stateKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s, ok := state.(*stateData)
|
|
if !ok {
|
|
return nil, errors.New("unable to convert state into stateData")
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// filterExtendedResources computes the special claim's Requests based on the
|
|
// node's Allocatable. It returns the special claim updated to match what needs
|
|
// to be allocated through DRA for the node or nil if nothing needs to be allocated.
|
|
//
|
|
// It returns an error when the pod's extended resource requests cannot be allocated
|
|
// from node's Allocatable, nor matching any device class's explicit or implicit
|
|
// ExtendedResourceName.
|
|
func (pl *DynamicResources) filterExtendedResources(state *stateData, pod *v1.Pod, nodeInfo fwk.NodeInfo, logger klog.Logger) (*resourceapi.ResourceClaim, *fwk.Status) {
|
|
extendedResourceClaim := state.claims.extendedResourceClaim()
|
|
if extendedResourceClaim == nil {
|
|
// Nothing to do.
|
|
return nil, nil
|
|
}
|
|
|
|
// The claim is from the prior scheduling cycle, return unschedulable such that it can be
|
|
// deleted at the PostFilter phase, and retry anew.
|
|
if extendedResourceClaim.Spec.Devices.Requests != nil {
|
|
return nil, statusUnschedulable(logger, "cannot schedule extended resource claim", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "claim", klog.KObj(extendedResourceClaim))
|
|
}
|
|
|
|
extendedResources := make(map[v1.ResourceName]int64)
|
|
hasExtendedResource := false
|
|
for rName, rQuant := range state.draExtendedResource.podScalarResources {
|
|
if !v1helper.IsExtendedResourceName(rName) {
|
|
continue
|
|
}
|
|
// Skip in case request quantity is zero
|
|
if rQuant == 0 {
|
|
continue
|
|
}
|
|
|
|
_, okScalar := nodeInfo.GetAllocatable().GetScalarResources()[rName]
|
|
_, okDynamic := state.draExtendedResource.resourceToDeviceClass[rName]
|
|
if okDynamic {
|
|
if okScalar {
|
|
// node provides the resource via device plugin
|
|
extendedResources[rName] = 0
|
|
} else {
|
|
// node needs to provide the resource via DRA
|
|
extendedResources[rName] = rQuant
|
|
hasExtendedResource = true
|
|
}
|
|
} else if !okScalar {
|
|
// has request neither provided by device plugin, nor backed by DRA,
|
|
// hence the pod does not fit the node.
|
|
return nil, statusUnschedulable(logger, "cannot fit resource", "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()), "resource", rName)
|
|
}
|
|
}
|
|
|
|
// No extended resources backed by DRA on this node.
|
|
// The pod may have extended resources, but they are all backed by device
|
|
// plugin, hence the noderesources plugin should have checked if the node
|
|
// can fit the pod.
|
|
// This dynamic resources plugin Filter phase has nothing left to do.
|
|
if state.claims.noUserClaim() && !hasExtendedResource {
|
|
// It cannot be allocated when reaching here, as the claim from prior scheduling cycle
|
|
// would return unschedulable earlier in this function.
|
|
return nil, nil
|
|
}
|
|
|
|
// Each node needs its own, potentially different variant of the claim.
|
|
nodeExtendedResourceClaim := extendedResourceClaim.DeepCopy()
|
|
nodeExtendedResourceClaim.Spec.Devices.Requests = createDeviceRequests(pod, extendedResources, state.draExtendedResource.resourceToDeviceClass)
|
|
|
|
if extendedResourceClaim.Status.Allocation != nil {
|
|
// If it is already allocated, then we cannot simply allocate it again.
|
|
//
|
|
// It cannot be allocated when reaching here, as the claim found from prior scheduling cycle
|
|
// would return unschedulable earlier in this function.
|
|
return nil, nil
|
|
}
|
|
|
|
return nodeExtendedResourceClaim, nil
|
|
}
|
|
|
|
// createDeviceRequests computes the special claim's Requests based on the pod's extended resources
|
|
// that are not satisfied by the node's Allocatable.
|
|
//
|
|
// the device request name has the format: container-%d-request-%d,
|
|
// the first %d is the container's index in the pod's initContainer and containers
|
|
// the second %d is the extended resource's index in that container's sorted resource requests.
|
|
func createDeviceRequests(pod *v1.Pod, extendedResources map[v1.ResourceName]int64, deviceClassMapping map[v1.ResourceName]string) []resourceapi.DeviceRequest {
|
|
var deviceRequests []resourceapi.DeviceRequest
|
|
// Creating the extended resource claim's Requests by
|
|
// iterating over the containers, and the resources in the containers,
|
|
// and create one request per <container, extended resource>.
|
|
|
|
// pod level resources currently have only cpu and memory, they are not considered here for now.
|
|
// if extended resources are added to pod level resources in the future, they need to be
|
|
// supported separately.
|
|
containers := slices.Clone(pod.Spec.InitContainers)
|
|
containers = append(containers, pod.Spec.Containers...)
|
|
for r := range extendedResources {
|
|
for i, c := range containers {
|
|
creqs := c.Resources.Requests
|
|
if creqs == nil {
|
|
continue
|
|
}
|
|
var rQuant resource.Quantity
|
|
var ok bool
|
|
if rQuant, ok = creqs[r]; !ok {
|
|
continue
|
|
}
|
|
crq, ok := (&rQuant).AsInt64()
|
|
if !ok || crq == 0 {
|
|
continue
|
|
}
|
|
className, ok := deviceClassMapping[r]
|
|
// skip if the request does not map to a device class
|
|
if !ok || className == "" {
|
|
continue
|
|
}
|
|
keys := make([]string, 0, len(creqs))
|
|
for k := range creqs {
|
|
keys = append(keys, k.String())
|
|
}
|
|
// resource requests in a container is a map, their names must
|
|
// be sorted to determine the resource's index order.
|
|
slice.SortStrings(keys)
|
|
ridx := 0
|
|
for j := range keys {
|
|
if keys[j] == r.String() {
|
|
ridx = j
|
|
break
|
|
}
|
|
}
|
|
// i is the index of the container if the list of initContainers + containers.
|
|
// ridx is the index of the extended resource request in the sorted all requests in the container.
|
|
// crq is the quantity of the extended resource request.
|
|
deviceRequests = append(deviceRequests,
|
|
resourceapi.DeviceRequest{
|
|
Name: fmt.Sprintf("container-%d-request-%d", i, ridx), // need to be container name index - extended resource name index
|
|
Exactly: &resourceapi.ExactDeviceRequest{
|
|
DeviceClassName: className, // map external resource name -> device class name
|
|
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
|
|
Count: crq,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
return deviceRequests
|
|
}
|
|
|
|
// Filter invoked at the filter extension point.
|
|
// It evaluates if a pod can fit due to the resources it requests,
|
|
// for both allocated and unallocated claims.
|
|
//
|
|
// For claims that are bound, then it checks that the node affinity is
|
|
// satisfied by the given node.
|
|
//
|
|
// For claims that are unbound, it checks whether the claim might get allocated
|
|
// for the node.
|
|
func (pl *DynamicResources) Filter(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
|
|
if !pl.enabled {
|
|
return nil
|
|
}
|
|
state, err := getStateData(cs)
|
|
if err != nil {
|
|
return statusError(klog.FromContext(ctx), err)
|
|
}
|
|
if state.claims.empty() {
|
|
return nil
|
|
}
|
|
|
|
logger := klog.FromContext(ctx)
|
|
node := nodeInfo.Node()
|
|
nodeExtendedResourceClaim, status := pl.filterExtendedResources(state, pod, nodeInfo, logger)
|
|
if status != nil {
|
|
return status
|
|
}
|
|
// The pod has no user claim, it may have extended resource claim that is satisfied by device plugin.
|
|
// Then there is nothing left to do for this plugin.
|
|
if nodeExtendedResourceClaim == nil && state.claims.noUserClaim() {
|
|
return nil
|
|
}
|
|
|
|
var unavailableClaims []int
|
|
for index, claim := range state.claims.all() {
|
|
logger.V(10).Info("filtering based on resource claims of the pod", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
|
|
|
|
// This node selector only gets set if the claim is allocated.
|
|
if nodeSelector := state.informationsForClaim[index].availableOnNodes; nodeSelector != nil && !nodeSelector.Match(node) {
|
|
logger.V(5).Info("allocation's node selector does not match", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
|
|
unavailableClaims = append(unavailableClaims, index)
|
|
continue
|
|
}
|
|
|
|
if claim.Status.Allocation == nil {
|
|
// The claim is not allocated yet, don't have to check
|
|
// anything else.
|
|
continue
|
|
}
|
|
|
|
// The claim is allocated, check whether it is ready for binding.
|
|
if pl.enableDeviceBindingConditions && pl.enableDeviceStatus {
|
|
ready, err := pl.isClaimReadyForBinding(claim)
|
|
// If the claim is not ready yet (ready false, no error) and binding has timed out
|
|
// or binding has failed (err non-nil), then the scheduler should consider deallocating this
|
|
// claim in PostFilter to unblock trying other devices.
|
|
if err != nil || !ready && pl.isClaimTimeout(claim) {
|
|
unavailableClaims = append(unavailableClaims, index)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Use allocator to check the node and cache the result in case that the node is picked.
|
|
var allocations []resourceapi.AllocationResult
|
|
if state.allocator != nil {
|
|
allocCtx := ctx
|
|
if loggerV := logger.V(5); loggerV.Enabled() {
|
|
allocCtx = klog.NewContext(allocCtx, klog.LoggerWithValues(logger, "node", klog.KObj(node)))
|
|
}
|
|
|
|
// Apply timeout to the operation?
|
|
if pl.enableFilterTimeout && pl.filterTimeout > 0 {
|
|
c, cancel := context.WithTimeout(allocCtx, pl.filterTimeout)
|
|
defer cancel()
|
|
allocCtx = c
|
|
}
|
|
|
|
// Check which claims need to be allocated.
|
|
//
|
|
// This replaces the special ResourceClaim for extended resources with one
|
|
// matching the node.
|
|
claimsToAllocate := make([]*resourceapi.ResourceClaim, 0, state.claims.len())
|
|
extendedResourceClaim := state.claims.extendedResourceClaim()
|
|
for _, claim := range state.claims.toAllocate() {
|
|
if claim == extendedResourceClaim && nodeExtendedResourceClaim != nil {
|
|
claim = nodeExtendedResourceClaim
|
|
}
|
|
claimsToAllocate = append(claimsToAllocate, claim)
|
|
}
|
|
a, err := state.allocator.Allocate(allocCtx, node, claimsToAllocate)
|
|
switch {
|
|
case errors.Is(err, context.DeadlineExceeded):
|
|
return statusUnschedulable(logger, "timed out trying to allocate devices", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaims", klog.KObjSlice(claimsToAllocate))
|
|
case ctx.Err() != nil:
|
|
return statusUnschedulable(logger, fmt.Sprintf("asked by caller to stop allocating devices: %v", context.Cause(ctx)), "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaims", klog.KObjSlice(claimsToAllocate))
|
|
case err != nil:
|
|
// This should only fail if there is something wrong with the claim or class.
|
|
// Return an error to abort scheduling of it.
|
|
//
|
|
// This will cause retries. It would be slightly nicer to mark it as unschedulable
|
|
// *and* abort scheduling. Then only cluster event for updating the claim or class
|
|
// with the broken CEL expression would trigger rescheduling.
|
|
//
|
|
// But we cannot do both. As this shouldn't occur often, aborting like this is
|
|
// better than the more complicated alternative (return Unschedulable here, remember
|
|
// the error, then later raise it again later if needed).
|
|
return statusError(logger, err, "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaims", klog.KObjSlice(claimsToAllocate))
|
|
}
|
|
// Check for exact length just to be sure. In practice this is all-or-nothing.
|
|
if len(a) != len(claimsToAllocate) {
|
|
return statusUnschedulable(logger, "cannot allocate all claims", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaims", klog.KObjSlice(claimsToAllocate))
|
|
}
|
|
// Reserve uses this information.
|
|
allocations = a
|
|
}
|
|
|
|
// Store information in state while holding the mutex.
|
|
if state.allocator != nil || len(unavailableClaims) > 0 {
|
|
state.mutex.Lock()
|
|
defer state.mutex.Unlock()
|
|
}
|
|
|
|
if len(unavailableClaims) > 0 {
|
|
// Remember all unavailable claims. This might be observed
|
|
// concurrently, so we have to lock the state before writing.
|
|
|
|
if state.unavailableClaims == nil {
|
|
state.unavailableClaims = sets.New[int]()
|
|
}
|
|
|
|
for _, index := range unavailableClaims {
|
|
state.unavailableClaims.Insert(index)
|
|
}
|
|
return statusUnschedulable(logger, "resourceclaim not available on the node", "pod", klog.KObj(pod))
|
|
}
|
|
|
|
if state.allocator != nil {
|
|
state.nodeAllocations[node.Name] = nodeAllocation{
|
|
allocationResults: allocations,
|
|
extendedResourceClaim: nodeExtendedResourceClaim,
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// isSpecialClaimName return true when the name is the specialClaimInMemName.
|
|
func isSpecialClaimName(name string) bool {
|
|
return name == specialClaimInMemName
|
|
}
|
|
|
|
// PostFilter checks whether there are allocated claims that could get
|
|
// deallocated to help get the Pod schedulable. If yes, it picks one and
|
|
// requests its deallocation. This only gets called when filtering found no
|
|
// suitable node.
|
|
func (pl *DynamicResources) PostFilter(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusReader) (*framework.PostFilterResult, *fwk.Status) {
|
|
if !pl.enabled {
|
|
return nil, fwk.NewStatus(fwk.Unschedulable, "plugin disabled")
|
|
}
|
|
|
|
logger := klog.FromContext(ctx)
|
|
state, err := getStateData(cs)
|
|
if err != nil {
|
|
return nil, statusError(logger, err)
|
|
}
|
|
// If a Pod doesn't have any resource claims attached to it, there is no need for further processing.
|
|
// Thus we provide a fast path for this case to avoid unnecessary computations.
|
|
if state.claims.empty() {
|
|
return nil, fwk.NewStatus(fwk.Unschedulable, "no new claims to deallocate")
|
|
}
|
|
extendedResourceClaim := state.claims.extendedResourceClaim()
|
|
|
|
// Iterating over a map is random. This is intentional here, we want to
|
|
// pick one claim randomly because there is no better heuristic.
|
|
for index := range state.unavailableClaims {
|
|
claim := state.claims.get(index)
|
|
if claim == extendedResourceClaim {
|
|
if extendedResourceClaim != nil && !isSpecialClaimName(extendedResourceClaim.Name) {
|
|
// Handled below.
|
|
break
|
|
}
|
|
continue
|
|
}
|
|
|
|
if len(claim.Status.ReservedFor) == 0 ||
|
|
len(claim.Status.ReservedFor) == 1 && claim.Status.ReservedFor[0].UID == pod.UID {
|
|
claim := claim.DeepCopy()
|
|
claim.Status.ReservedFor = nil
|
|
claim.Status.Allocation = nil
|
|
claim.Status.Devices = nil
|
|
logger.V(5).Info("Deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
|
|
if _, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
|
|
return nil, statusError(logger, err)
|
|
}
|
|
return nil, fwk.NewStatus(fwk.Unschedulable, "deallocation of ResourceClaim completed")
|
|
}
|
|
}
|
|
|
|
if extendedResourceClaim != nil && !isSpecialClaimName(extendedResourceClaim.Name) {
|
|
// If the special resource claim for extended resource backed by DRA
|
|
// is reserved or allocated at prior scheduling cycle, then it should be deleted.
|
|
extendedResourceClaim := extendedResourceClaim.DeepCopy()
|
|
if err := pl.deleteClaim(ctx, extendedResourceClaim, logger); err != nil {
|
|
return nil, statusError(logger, err)
|
|
}
|
|
return nil, fwk.NewStatus(fwk.Unschedulable, "deletion of ResourceClaim completed")
|
|
}
|
|
return nil, fwk.NewStatus(fwk.Unschedulable, "still not schedulable")
|
|
}
|
|
|
|
// Reserve reserves claims for the pod.
|
|
func (pl *DynamicResources) Reserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) (status *fwk.Status) {
|
|
if !pl.enabled {
|
|
return nil
|
|
}
|
|
state, err := getStateData(cs)
|
|
if err != nil {
|
|
return statusError(klog.FromContext(ctx), err)
|
|
}
|
|
if state.claims.empty() {
|
|
return nil
|
|
}
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
numClaimsWithAllocator := 0
|
|
for _, claim := range state.claims.all() {
|
|
if claim.Status.Allocation != nil {
|
|
// Allocated, but perhaps not reserved yet. We checked in PreFilter that
|
|
// the pod could reserve the claim. Instead of reserving here by
|
|
// updating the ResourceClaim status, we assume that reserving
|
|
// will work and only do it for real during binding. If it fails at
|
|
// that time, some other pod was faster and we have to try again.
|
|
continue
|
|
}
|
|
|
|
numClaimsWithAllocator++
|
|
}
|
|
|
|
if numClaimsWithAllocator == 0 {
|
|
// Nothing left to do.
|
|
return nil
|
|
}
|
|
extendedResourceClaim := state.claims.extendedResourceClaim()
|
|
numClaimsToAllocate := 0
|
|
needToAllocateUserClaims := false
|
|
for _, claim := range state.claims.toAllocate() {
|
|
numClaimsToAllocate++
|
|
if claim != extendedResourceClaim {
|
|
needToAllocateUserClaims = true
|
|
}
|
|
}
|
|
|
|
// Prepare allocation of claims handled by the schedulder.
|
|
if state.allocator != nil {
|
|
// Entries in these two slices match each other.
|
|
allocations, ok := state.nodeAllocations[nodeName]
|
|
if !ok || len(allocations.allocationResults) == 0 {
|
|
// This can happen only when claimsToAllocate has a single special claim template for extended resource backed by DRA,
|
|
// But it is satisfied by the node with device plugin, hence no DRA allocation.
|
|
if !needToAllocateUserClaims {
|
|
return nil
|
|
}
|
|
|
|
// We checked before that the node is suitable. This shouldn't have failed,
|
|
// so treat this as an error.
|
|
return statusError(logger, errors.New("claim allocation not found for node"))
|
|
}
|
|
|
|
// Sanity check: do we have results for all pending claims?
|
|
if len(allocations.allocationResults) != numClaimsToAllocate ||
|
|
len(allocations.allocationResults) != numClaimsWithAllocator {
|
|
return statusError(logger, fmt.Errorf("internal error, have %d allocations, %d claims to allocate, want %d claims", len(allocations.allocationResults), numClaimsToAllocate, numClaimsWithAllocator))
|
|
}
|
|
|
|
allocIndex := 0
|
|
for index, claim := range state.claims.toAllocate() {
|
|
// The index returned is the original index in the underlying claim store, it
|
|
// may not be sequentially numbered (e.g. 0, 1, 2 ...).
|
|
allocation := &allocations.allocationResults[allocIndex]
|
|
state.informationsForClaim[index].allocation = allocation
|
|
|
|
if claim == extendedResourceClaim {
|
|
// replace the special claim template for extended
|
|
// resource backed by DRA with the real instantiated claim.
|
|
claim = allocations.extendedResourceClaim
|
|
}
|
|
|
|
// Strictly speaking, we don't need to store the full modified object.
|
|
// The allocation would be enough. The full object is useful for
|
|
// debugging, testing and the allocator, so let's make it realistic.
|
|
claim = claim.DeepCopy()
|
|
if !slices.Contains(claim.Finalizers, resourceapi.Finalizer) {
|
|
claim.Finalizers = append(claim.Finalizers, resourceapi.Finalizer)
|
|
}
|
|
claim.Status.Allocation = allocation
|
|
err := pl.draManager.ResourceClaims().SignalClaimPendingAllocation(claim.UID, claim)
|
|
if err != nil {
|
|
return statusError(logger, fmt.Errorf("internal error, couldn't signal allocation for claim %s", claim.Name))
|
|
}
|
|
logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "allocation", klog.Format(allocation))
|
|
allocIndex++
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Unreserve clears the ReservedFor field for all claims.
|
|
// It's idempotent, and does nothing if no state found for the given pod.
|
|
func (pl *DynamicResources) Unreserve(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) {
|
|
if !pl.enabled {
|
|
return
|
|
}
|
|
state, err := getStateData(cs)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if state.claims.empty() {
|
|
return
|
|
}
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
// we process user claims here first, extendedResourceClaim if any is handled below.
|
|
for _, claim := range state.claims.allUserClaims() {
|
|
// If allocation was in-flight, then it's not anymore and we need to revert the
|
|
// claim object in the assume cache to what it was before.
|
|
if deleted := pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(claim.UID); deleted {
|
|
pl.draManager.ResourceClaims().AssumedClaimRestore(claim.Namespace, claim.Name)
|
|
}
|
|
|
|
if claim.Status.Allocation != nil &&
|
|
resourceclaim.IsReservedForPod(pod, claim) {
|
|
// Remove pod from ReservedFor. A strategic-merge-patch is used
|
|
// because that allows removing an individual entry without having
|
|
// the latest ResourceClaim.
|
|
patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"$patch": "delete", "uid": %q} ] }}`,
|
|
claim.UID,
|
|
pod.UID,
|
|
)
|
|
logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim), "pod", klog.KObj(pod))
|
|
claim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
|
|
if err != nil {
|
|
// We will get here again when pod scheduling is retried.
|
|
logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim))
|
|
}
|
|
}
|
|
}
|
|
|
|
extendedResourceClaim := state.claims.extendedResourceClaim()
|
|
if extendedResourceClaim == nil {
|
|
// there is no extended resource claim
|
|
return
|
|
}
|
|
|
|
if deleted := pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(state.claims.getInitialExtendedResourceClaimUID()); deleted {
|
|
pl.draManager.ResourceClaims().AssumedClaimRestore(extendedResourceClaim.Namespace, extendedResourceClaim.Name)
|
|
}
|
|
if isSpecialClaimName(extendedResourceClaim.Name) {
|
|
// In memory temporary extended resource claim does not need to be deleted
|
|
return
|
|
}
|
|
logger.V(5).Info("delete extended resource backed by DRA", "resourceclaim", klog.KObj(extendedResourceClaim), "pod", klog.KObj(pod), "claim.UID", extendedResourceClaim.UID)
|
|
extendedResourceClaim = extendedResourceClaim.DeepCopy()
|
|
if err := pl.deleteClaim(ctx, extendedResourceClaim, logger); err != nil {
|
|
logger.Error(err, "delete", "resourceclaim", klog.KObj(extendedResourceClaim))
|
|
}
|
|
}
|
|
|
|
// deleteClaim deletes the claim after removing the finalizer from the claim, if there is any.
|
|
func (pl *DynamicResources) deleteClaim(ctx context.Context, claim *resourceapi.ResourceClaim, logger klog.Logger) error {
|
|
refreshClaim := false
|
|
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
if refreshClaim {
|
|
updatedClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("get resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err)
|
|
}
|
|
claim = updatedClaim
|
|
} else {
|
|
refreshClaim = true
|
|
}
|
|
// Remove the finalizer to unblock removal first.
|
|
builtinControllerFinalizer := slices.Index(claim.Finalizers, resourceapi.Finalizer)
|
|
if builtinControllerFinalizer >= 0 {
|
|
claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1)
|
|
}
|
|
|
|
_, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("update resourceclaim %s/%s: %w", claim.Namespace, claim.Name, err)
|
|
}
|
|
return nil
|
|
})
|
|
if retryErr != nil {
|
|
return retryErr
|
|
}
|
|
|
|
logger.V(5).Info("Delete", "resourceclaim", klog.KObj(claim))
|
|
err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PreBind gets called in a separate goroutine after it has been determined
|
|
// that the pod should get bound to this node. Because Reserve did not actually
|
|
// reserve claims, we need to do it now. For claims with the builtin controller,
|
|
// we also handle the allocation.
|
|
//
|
|
// If anything fails, we return an error and
|
|
// the pod will have to go into the backoff queue. The scheduler will call
|
|
// Unreserve as part of the error handling.
|
|
func (pl *DynamicResources) PreBind(ctx context.Context, cs fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
|
|
if !pl.enabled {
|
|
return nil
|
|
}
|
|
state, err := getStateData(cs)
|
|
if err != nil {
|
|
return statusError(klog.FromContext(ctx), err)
|
|
}
|
|
if state.claims.empty() {
|
|
return nil
|
|
}
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
for index, claim := range state.claims.all() {
|
|
if !resourceclaim.IsReservedForPod(pod, claim) {
|
|
claim, err := pl.bindClaim(ctx, state, index, pod, nodeName)
|
|
if err != nil {
|
|
return statusError(logger, err)
|
|
}
|
|
// Updated here such that Unreserve can work with patched claim.
|
|
state.claims.set(index, claim)
|
|
}
|
|
}
|
|
|
|
if !pl.enableDeviceBindingConditions || !pl.enableDeviceStatus {
|
|
// If we don't have binding conditions, we can return early.
|
|
// The claim is now reserved for the pod and the scheduler can proceed with binding.
|
|
return nil
|
|
}
|
|
|
|
// We need to check if the device is attached to the node.
|
|
needToWait := hasBindingConditions(state)
|
|
|
|
// If no device needs to be prepared, we can return early.
|
|
if !needToWait {
|
|
return nil
|
|
}
|
|
|
|
// We need to wait for the device to be attached to the node.
|
|
pl.fh.EventRecorder().Eventf(pod, nil, v1.EventTypeNormal, "BindingConditionsPending", "Scheduling", "waiting for binding conditions for device on node %s", nodeName)
|
|
err = wait.PollUntilContextTimeout(ctx, 5*time.Second, time.Duration(BindingTimeoutDefaultSeconds)*time.Second, true,
|
|
func(ctx context.Context) (bool, error) {
|
|
return pl.isPodReadyForBinding(state)
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
err = errors.New("device binding timeout")
|
|
}
|
|
// Returning an error here causes another scheduling attempt.
|
|
// In that next attempt, PreFilter will detect the timeout or
|
|
// error and try to recover.
|
|
return statusError(logger, err)
|
|
}
|
|
|
|
// If we get here, we know that reserving the claim for
|
|
// the pod worked and we can proceed with binding it.
|
|
return nil
|
|
}
|
|
|
|
// PreBindPreFlight is called before PreBind, and determines whether PreBind is going to do something for this pod, or not.
|
|
// It just checks state.claims to determine whether there are any claims and hence the plugin has to handle them at PreBind.
|
|
func (pl *DynamicResources) PreBindPreFlight(ctx context.Context, cs fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status {
|
|
if !pl.enabled {
|
|
return fwk.NewStatus(fwk.Skip)
|
|
}
|
|
state, err := getStateData(cs)
|
|
if err != nil {
|
|
return statusError(klog.FromContext(ctx), err)
|
|
}
|
|
if state.claims.empty() {
|
|
return fwk.NewStatus(fwk.Skip)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// createRequestMappings creates the requestMappings for the special extended resource claim.
|
|
// For each device request in the claim, it finds the container name, and
|
|
// the extended resource name in that container matching the device request.
|
|
// the device request name has the format: container-%d-request-%d,
|
|
// the first %d is the container's index in the pod's initContainer and containers
|
|
// the second %d is the extended resource's index in that container's sorted resource requests.
|
|
func createRequestMappings(claim *resourceapi.ResourceClaim, pod *v1.Pod) []v1.ContainerExtendedResourceRequest {
|
|
var cer []v1.ContainerExtendedResourceRequest
|
|
deviceReqNames := make([]string, 0, len(claim.Spec.Devices.Requests))
|
|
for _, r := range claim.Spec.Devices.Requests {
|
|
deviceReqNames = append(deviceReqNames, r.Name)
|
|
}
|
|
// pod level resources currently have only cpu and memory, they are not considered here for now.
|
|
// if extended resources are added to pod level resources in the future, they need to be
|
|
// supported separately.
|
|
containers := slices.Clone(pod.Spec.InitContainers)
|
|
containers = append(containers, pod.Spec.Containers...)
|
|
for i, c := range containers {
|
|
creqs := c.Resources.Requests
|
|
keys := make([]string, 0, len(creqs))
|
|
for k := range creqs {
|
|
keys = append(keys, k.String())
|
|
}
|
|
// resource requests in a container is a map, their names must
|
|
// be sorted to determine the resource's index order.
|
|
slice.SortStrings(keys)
|
|
for rName := range creqs {
|
|
ridx := 0
|
|
for j := range keys {
|
|
if keys[j] == rName.String() {
|
|
ridx = j
|
|
break
|
|
}
|
|
}
|
|
for _, devReqName := range deviceReqNames {
|
|
// During filter phase, device request name is set to be
|
|
// container name index "-" extended resource name index
|
|
if fmt.Sprintf("container-%d-request-%d", i, ridx) == devReqName {
|
|
cer = append(cer,
|
|
v1.ContainerExtendedResourceRequest{
|
|
ContainerName: c.Name,
|
|
ResourceName: rName.String(),
|
|
RequestName: devReqName,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return cer
|
|
}
|
|
|
|
// bindClaim gets called by PreBind for claim which is not reserved for the pod yet.
|
|
// It might not even be allocated. bindClaim then ensures that the allocation
|
|
// and reservation are recorded. This finishes the work started in Reserve.
|
|
func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourceapi.ResourceClaim, finalErr error) {
|
|
logger := klog.FromContext(ctx)
|
|
claim := state.claims.get(index)
|
|
allocation := state.informationsForClaim[index].allocation
|
|
isExtendedResourceClaim := false
|
|
if claim == state.claims.extendedResourceClaim() {
|
|
// extended resource requests satisfied by device plugin
|
|
if allocation == nil && claim.Spec.Devices.Requests == nil {
|
|
return claim, nil
|
|
}
|
|
isExtendedResourceClaim = true
|
|
}
|
|
claimUIDs := []types.UID{claim.UID}
|
|
defer func() {
|
|
if allocation != nil {
|
|
// The scheduler was handling allocation. Now that has
|
|
// completed, either successfully or with a failure.
|
|
if finalErr == nil {
|
|
// This can fail, but only for reasons that are okay (concurrent delete or update).
|
|
// Shouldn't happen in this case.
|
|
if isExtendedResourceClaim {
|
|
// Unlike other claims, extended resource claim is created in API server below.
|
|
// AssumeClaimAfterAPICall returns ErrNotFound when the informer update has not reached assumed cache yet.
|
|
// Hence we must poll and wait for it.
|
|
pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second, true,
|
|
func(ctx context.Context) (bool, error) {
|
|
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
|
|
if errors.Is(err, assumecache.ErrNotFound) {
|
|
return false, nil
|
|
}
|
|
logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err)
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
})
|
|
if pollErr != nil {
|
|
logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr)
|
|
}
|
|
} else {
|
|
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
|
|
logger.V(5).Info("Claim not stored in assume cache", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, claimUID := range claimUIDs {
|
|
pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(claimUID)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Create the special claim for extended resource backed by DRA
|
|
if isExtendedResourceClaim && isSpecialClaimName(claim.Name) {
|
|
logger.V(5).Info("preparing to create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim))
|
|
// Replace claim template with instantiated claim for the node.
|
|
if nodeAllocation, ok := state.nodeAllocations[nodeName]; ok && nodeAllocation.extendedResourceClaim != nil {
|
|
claim = nodeAllocation.extendedResourceClaim.DeepCopy()
|
|
} else {
|
|
return nil, fmt.Errorf("extended resource claim not found for node %s", nodeName)
|
|
}
|
|
logger.V(5).Info("create claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim))
|
|
// Clear fields which must or can not be set during creation.
|
|
claim.Status.Allocation = nil
|
|
claim.Name = ""
|
|
claim.UID = ""
|
|
var err error
|
|
claim, err = pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Create(ctx, claim, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create claim for extended resources %v: %w", klog.KObj(claim), err)
|
|
}
|
|
logger.V(5).Info("created claim for extended resources", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim))
|
|
|
|
// Track the actual extended ResourceClaim from now.
|
|
// Relevant if we need to delete again in Unreserve.
|
|
if err := state.claims.updateExtendedResourceClaim(claim); err != nil {
|
|
return nil, fmt.Errorf("internal error: update extended ResourceClaim: %w", err)
|
|
}
|
|
}
|
|
|
|
logger.V(5).Info("preparing claim status update", "claim", klog.KObj(state.claims.get(index)), "allocation", klog.Format(allocation))
|
|
|
|
// We may run into a ResourceVersion conflict because there may be some
|
|
// benign concurrent changes. In that case we get the latest claim and
|
|
// try again.
|
|
refreshClaim := false
|
|
claim = claim.DeepCopy()
|
|
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
if refreshClaim {
|
|
updatedClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("get updated claim %s after conflict: %w", klog.KObj(claim), err)
|
|
}
|
|
logger.V(5).Info("retrying update after conflict", "claim", klog.KObj(claim))
|
|
claim = updatedClaim
|
|
} else {
|
|
// All future retries must get a new claim first.
|
|
refreshClaim = true
|
|
}
|
|
|
|
if claim.DeletionTimestamp != nil {
|
|
return fmt.Errorf("claim %s got deleted in the meantime", klog.KObj(claim))
|
|
}
|
|
|
|
// Do we need to store an allocation result from Reserve?
|
|
if allocation != nil {
|
|
if claim.Status.Allocation != nil {
|
|
return fmt.Errorf("claim %s got allocated elsewhere in the meantime", klog.KObj(claim))
|
|
}
|
|
|
|
// The finalizer needs to be added in a normal update.
|
|
// If we were interrupted in the past, it might already be set and we simply continue.
|
|
if !slices.Contains(claim.Finalizers, resourceapi.Finalizer) {
|
|
claim.Finalizers = append(claim.Finalizers, resourceapi.Finalizer)
|
|
updatedClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("add finalizer to claim %s: %w", klog.KObj(claim), err)
|
|
}
|
|
claim = updatedClaim
|
|
}
|
|
claim.Status.Allocation = allocation
|
|
}
|
|
|
|
// We can simply try to add the pod here without checking
|
|
// preconditions. The apiserver will tell us with a
|
|
// non-conflict error if this isn't possible.
|
|
claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourceapi.ResourceClaimConsumerReference{Resource: "pods", Name: pod.Name, UID: pod.UID})
|
|
if pl.enableDeviceBindingConditions && pl.enableDeviceStatus && claim.Status.Allocation.AllocationTimestamp == nil {
|
|
claim.Status.Allocation.AllocationTimestamp = &metav1.Time{Time: time.Now()}
|
|
}
|
|
updatedClaim, err := pl.clientset.ResourceV1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
if allocation != nil {
|
|
return fmt.Errorf("add allocation and reservation to claim %s: %w", klog.KObj(claim), err)
|
|
}
|
|
return fmt.Errorf("add reservation to claim %s: %w", klog.KObj(claim), err)
|
|
}
|
|
claim = updatedClaim
|
|
return nil
|
|
})
|
|
|
|
if retryErr != nil {
|
|
return nil, retryErr
|
|
}
|
|
|
|
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", nodeName, "resourceclaim", klog.Format(claim))
|
|
|
|
// Patch the pod status with the new information about the generated
|
|
// special resource claim.
|
|
if isExtendedResourceClaim {
|
|
cer := createRequestMappings(claim, pod)
|
|
podStatusCopy := pod.Status.DeepCopy()
|
|
podStatusCopy.ExtendedResourceClaimStatus = &v1.PodExtendedResourceClaimStatus{
|
|
RequestMappings: cer,
|
|
ResourceClaimName: claim.Name,
|
|
}
|
|
err := schedutil.PatchPodStatus(ctx, pl.clientset, pod.Name, pod.Namespace, &pod.Status, podStatusCopy)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("update pod %s/%s ExtendedResourceClaimStatus: %w", pod.Namespace, pod.Name, err)
|
|
}
|
|
}
|
|
|
|
return claim, nil
|
|
}
|
|
|
|
// isClaimReadyForBinding checks whether a given resource claim is
|
|
// ready for binding.
|
|
// It returns an error if the claim is not ready for binding.
|
|
// It returns true if (and only if) all binding conditions are true,
|
|
// and no binding failure conditions are true,
|
|
// which includes the case that there are no binding conditions.
|
|
func (pl *DynamicResources) isClaimReadyForBinding(claim *resourceapi.ResourceClaim) (bool, error) {
|
|
if claim.Status.Allocation == nil {
|
|
return false, nil
|
|
}
|
|
for _, deviceRequest := range claim.Status.Allocation.Devices.Results {
|
|
if len(deviceRequest.BindingConditions) == 0 {
|
|
continue
|
|
}
|
|
deviceStatus := getAllocatedDeviceStatus(claim, &deviceRequest)
|
|
if deviceStatus == nil {
|
|
return false, nil
|
|
}
|
|
for _, cond := range deviceRequest.BindingFailureConditions {
|
|
failedCond := apimeta.FindStatusCondition(deviceStatus.Conditions, cond)
|
|
if failedCond != nil && failedCond.Status == metav1.ConditionTrue {
|
|
return false, fmt.Errorf("claim %s binding failed: reason=%s, message=%q",
|
|
claim.Name,
|
|
failedCond.Reason,
|
|
failedCond.Message)
|
|
}
|
|
}
|
|
for _, cond := range deviceRequest.BindingConditions {
|
|
if !apimeta.IsStatusConditionTrue(deviceStatus.Conditions, cond) {
|
|
return false, nil
|
|
}
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// isClaimTimeout checks whether a given resource claim has
|
|
// reached the binding timeout.
|
|
// It returns true if the binding timeout is reached.
|
|
// It returns false if the binding timeout is not reached.
|
|
func (pl *DynamicResources) isClaimTimeout(claim *resourceapi.ResourceClaim) bool {
|
|
if !pl.enableDeviceBindingConditions || !pl.enableDeviceStatus {
|
|
return false
|
|
}
|
|
if claim.Status.Allocation == nil || claim.Status.Allocation.AllocationTimestamp == nil {
|
|
return false
|
|
}
|
|
// check if the binding timeout is reached
|
|
for _, deviceRequest := range claim.Status.Allocation.Devices.Results {
|
|
if deviceRequest.BindingConditions == nil {
|
|
continue
|
|
}
|
|
if claim.Status.Allocation.AllocationTimestamp.Add(time.Duration(BindingTimeoutDefaultSeconds) * time.Second).Before(time.Now()) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// isPodReadyForBinding checks the binding status of devices within the given state claims.
|
|
// It returns true if (and only if) all binding conditions are true,
|
|
// and no binding failure conditions are true,
|
|
// which includes the case when there are no binding conditions.
|
|
// It returns an error if any binding failure condition is set.
|
|
func (pl *DynamicResources) isPodReadyForBinding(state *stateData) (bool, error) {
|
|
for claimIndex, claim := range state.claims.all() {
|
|
claim, err := pl.draManager.ResourceClaims().Get(claim.Namespace, claim.Name)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
state.claims.set(claimIndex, claim)
|
|
ready, err := pl.isClaimReadyForBinding(claim)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !ready {
|
|
if pl.isClaimTimeout(claim) {
|
|
return false, fmt.Errorf("claim %s binding timeout", claim.Name)
|
|
}
|
|
return false, nil
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// hasBindingConditions checks whether any of the claims in the state
|
|
// has binding conditions.
|
|
// It returns true if at least one claim has binding conditions.
|
|
// It returns false if no claim has binding conditions.
|
|
func hasBindingConditions(state *stateData) bool {
|
|
for _, claim := range state.claims.all() {
|
|
if claim.Status.Allocation == nil {
|
|
continue
|
|
}
|
|
for _, device := range claim.Status.Allocation.Devices.Results {
|
|
if len(device.BindingConditions) > 0 {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// statusUnschedulable ensures that there is a log message associated with the
|
|
// line where the status originated.
|
|
func statusUnschedulable(logger klog.Logger, reason string, kv ...interface{}) *fwk.Status {
|
|
if loggerV := logger.V(5); loggerV.Enabled() {
|
|
helper, loggerV := loggerV.WithCallStackHelper()
|
|
helper()
|
|
kv = append(kv, "reason", reason)
|
|
// nolint: logcheck // warns because it cannot check key/values
|
|
loggerV.Info("pod unschedulable", kv...)
|
|
}
|
|
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, reason)
|
|
}
|
|
|
|
// statusError ensures that there is a log message associated with the
|
|
// line where the error originated.
|
|
func statusError(logger klog.Logger, err error, kv ...interface{}) *fwk.Status {
|
|
if loggerV := logger.V(5); loggerV.Enabled() {
|
|
helper, loggerV := loggerV.WithCallStackHelper()
|
|
helper()
|
|
// nolint: logcheck // warns because it cannot check key/values
|
|
loggerV.Error(err, "dynamic resource plugin failed", kv...)
|
|
}
|
|
return fwk.AsStatus(err)
|
|
}
|
|
|
|
func getAllocatedDeviceStatus(claim *resourceapi.ResourceClaim, deviceRequest *resourceapi.DeviceRequestAllocationResult) *resourceapi.AllocatedDeviceStatus {
|
|
for _, device := range claim.Status.Devices {
|
|
if deviceRequest.Device == device.Device && deviceRequest.Driver == device.Driver && deviceRequest.Pool == device.Pool {
|
|
return &device
|
|
}
|
|
}
|
|
return nil
|
|
}
|