diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index c7eff6a01d0..5ec80f999ff 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -18,6 +18,7 @@ package allocation import ( "context" + "errors" "fmt" "path/filepath" "slices" @@ -41,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/allocation/state" "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -701,6 +703,13 @@ func (m *manager) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, strin if result := podAdmitHandler.Admit(attrs); !result.Admit { klog.InfoS("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message) return false, result.Reason, result.Message + } else if result.Admit && len(result.Errors) > 0 && result.Reason == admission.PodLevelResourcesIncompatible && utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) { + for _, err := range result.Errors { + var admissionWarning admission.Error + if errors.As(err, &admissionWarning) { + m.recorder.Event(attrs.Pod, v1.EventTypeWarning, admission.PodLevelResourcesIncompatible, admissionWarning.Error()) + } + } } } diff --git a/pkg/kubelet/cm/admission/errors.go b/pkg/kubelet/cm/admission/errors.go index 5e549220394..faac0098ec2 100644 --- a/pkg/kubelet/cm/admission/errors.go +++ b/pkg/kubelet/cm/admission/errors.go @@ -20,11 +20,19 @@ import ( "errors" "fmt" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) const ( ErrorReasonUnexpected = "UnexpectedAdmissionError" + + // Explicit reason when CPU/Memory manager's policy is incompatible with pod level resources. + PodLevelResourcesIncompatible = "PodLevelResourcesIncompatible" + + // Warnings for pod level resources when manager's policy incompatibility. + CPUManagerPodLevelResourcesError = "CPUManagerPodLevelResourcesError" + MemoryManagerPodLevelResourcesError = "MemoryManagerPodLevelResourcesError" ) type Error interface { @@ -49,14 +57,53 @@ func GetPodAdmitResult(err error) lifecycle.PodAdmitResult { return lifecycle.PodAdmitResult{Admit: true} } + var errs []error + // To support multiple pod-level resource errors, we need to check if the error + // is an aggregate error. + var agg utilerrors.Aggregate + if errors.As(err, &agg) { + errs = agg.Errors() + } else { + errs = []error{err} + } + + var podLevelWarnings []error + var otherErrs []error + for _, e := range errs { + var admissionErr Error + if errors.As(e, &admissionErr) && (admissionErr.Type() == CPUManagerPodLevelResourcesError || admissionErr.Type() == MemoryManagerPodLevelResourcesError) { + podLevelWarnings = append(podLevelWarnings, e) + } else { + otherErrs = append(otherErrs, e) + } + } + + // If all errors are pod-level resource errors, we should treat them as warnings + // and not block pod admission. + if len(otherErrs) == 0 && len(podLevelWarnings) > 0 { + return lifecycle.PodAdmitResult{ + Admit: true, + Reason: PodLevelResourcesIncompatible, + Message: "", + Errors: podLevelWarnings, + } + } + + if len(otherErrs) == 0 { + // This should not happen if err != nil, but as a safeguard. + return lifecycle.PodAdmitResult{Admit: true} + } + + // At this point, we have at least one error that requires pod rejection. + firstErr := otherErrs[0] var admissionErr Error - if !errors.As(err, &admissionErr) { - admissionErr = &unexpectedAdmissionError{err} + if !errors.As(firstErr, &admissionErr) { + admissionErr = &unexpectedAdmissionError{firstErr} } return lifecycle.PodAdmitResult{ + Admit: false, Message: admissionErr.Error(), Reason: admissionErr.Type(), - Admit: false, } } diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 8b59ba67121..1d98eef8320 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -18,6 +18,7 @@ package cpumanager import ( "context" + "errors" "fmt" "math" "sync" @@ -261,6 +262,13 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { // Call down into the policy to assign this container CPUs if required. err := m.policy.Allocate(m.state, p, c) + + // If it gets this error it means that the pod requires pod level resources but this is not aligned. + // We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning + if errors.As(err, &CPUManagerPodLevelResourcesError{}) { + return err + } + if err != nil { klog.ErrorS(err, "Allocate error") return err diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 28591c5baf1..fb787c35cc5 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -18,10 +18,12 @@ package cpumanager import ( "fmt" + "strconv" v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" @@ -35,13 +37,15 @@ import ( ) const ( - // PolicyStatic is the name of the static policy. // Should options be given, these will be ignored and backward (up to 1.21 included) // compatible behaviour will be enforced PolicyStatic policyName = "static" // ErrorSMTAlignment represents the type of an SMTAlignmentError ErrorSMTAlignment = "SMTAlignmentError" + + // ErrorCPUManagerPodLevelResources represents the type of a CPUManagerPodLevelResourcesError + ErrorCPUManagerPodLevelResources = "CPUManagerPodLevelResourcesError" ) // SMTAlignmentError represents an error due to SMT alignment @@ -52,6 +56,15 @@ type SMTAlignmentError struct { CausedByPhysicalCPUs bool } +// PodLevelResourcesError represents an error due to pod-level resources not being supported. +type CPUManagerPodLevelResourcesError struct{} + +func (e CPUManagerPodLevelResourcesError) Error() string { + return "CPU Manager static policy does not support pod-level resources" +} + +func (e CPUManagerPodLevelResourcesError) Type() string { return ErrorCPUManagerPodLevelResources } + func (e SMTAlignmentError) Error() string { if e.CausedByPhysicalCPUs { return fmt.Sprintf("SMT Alignment Error: not enough free physical CPUs: available physical CPUs = %d, requested CPUs = %d, CPUs per core = %d", e.AvailablePhysicalCPUs, e.RequestedCPUs, e.CpusPerCore) @@ -316,6 +329,10 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { numCPUs := p.guaranteedCPUs(pod, container) if numCPUs == 0 { + if p.isPodWithPodLevelResources(pod) { + return CPUManagerPodLevelResourcesError{} + } + // container belongs in the shared pool (nothing to do; use default cpuset) return nil } @@ -467,6 +484,12 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int klog.V(5).InfoS("Exclusive CPU allocation skipped, pod QoS is not guaranteed", "pod", klog.KObj(pod), "containerName", container.Name, "qos", qos) return 0 } + + // The CPU manager static policy does not support pod-level resources. + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + return 0 + } + cpuQuantity := container.Resources.Requests[v1.ResourceCPU] // In-place pod resize feature makes Container.Resources field mutable for CPU & memory. // AllocatedResources holds the value of Container.Resources.Requests when the pod was admitted. @@ -814,3 +837,14 @@ func updateAllocationPerNUMAMetric(topo *topology.CPUTopology, allocatedCPUs cpu metrics.CPUManagerAllocationPerNUMA.WithLabelValues(strconv.Itoa(numaNode)).Set(float64(count)) } } + +func (p *staticPolicy) isPodWithPodLevelResources(pod *v1.Pod) bool { + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + // The Memory manager static policy does not support pod-level resources. + klog.V(5).InfoS("CPU Manager allocation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod)) + + return true + } + + return false +} diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index 572f58599a0..9d5d186f703 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -18,6 +18,7 @@ package memorymanager import ( "context" + "errors" "fmt" "runtime" "sync" @@ -273,6 +274,12 @@ func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error { // Call down into the policy to assign this container memory if required. if err := m.policy.Allocate(ctx, m.state, pod, container); err != nil { + // If it gets this error it means that the pod requires pod level resources but this is not aligned. + // We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning + if errors.As(err, &MemoryManagerPodLevelResourcesError{}) { + return err + } + logger.Error(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name) return err } diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go index ce85610b65d..7939d7920aa 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static.go +++ b/pkg/kubelet/cm/memorymanager/policy_static.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" utilfeature "k8s.io/apiserver/pkg/util/feature" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -38,7 +39,12 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" ) -const PolicyTypeStatic policyType = "Static" +const ( + PolicyTypeStatic policyType = "Static" + + // ErrorMemoryManagerPodLevelResources represents the type of a MemoryManagerPodLevelResourcesError + ErrorMemoryManagerPodLevelResources = "MemoryManagerPodLevelResourcesError" +) type systemReservedMemory map[int]map[v1.ResourceName]uint64 type reusableMemory map[string]map[string]map[v1.ResourceName]uint64 @@ -60,6 +66,16 @@ type staticPolicy struct { var _ Policy = &staticPolicy{} +type MemoryManagerPodLevelResourcesError struct{} + +func (e MemoryManagerPodLevelResourcesError) Type() string { + return ErrorMemoryManagerPodLevelResources +} + +func (e MemoryManagerPodLevelResourcesError) Error() string { + return "Memory Manager static policy does not support pod-level resources" +} + // NewPolicyStatic returns new static policy instance func NewPolicyStatic(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) { var totalSystemReserved uint64 @@ -107,6 +123,10 @@ func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, return nil } + if p.isPodWithPodLevelResources(ctx, pod) { + return MemoryManagerPodLevelResourcesError{} + } + podUID := string(pod.UID) logger.Info("Allocate") // container belongs in an exclusively allocated pool @@ -406,6 +426,10 @@ func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, p return nil } + if p.isPodWithPodLevelResources(ctx, pod) { + return nil + } + reqRsrcs, err := getPodRequestedResources(pod) if err != nil { logger.Error(err, "Failed to get pod requested resources", "podUID", pod.UID) @@ -436,6 +460,10 @@ func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod return nil } + if p.isPodWithPodLevelResources(ctx, pod) { + return nil + } + requestedResources, err := getRequestedResources(pod, container) if err != nil { logger.Error(err, "Failed to get container requested resources", "podUID", pod.UID, "containerName", container.Name) @@ -1076,3 +1104,16 @@ func isAffinityViolatingNUMAAllocations(machineState state.NUMANodeMap, mask bit } return false } + +func (p *staticPolicy) isPodWithPodLevelResources(ctx context.Context, pod *v1.Pod) bool { + logger := klog.FromContext(ctx) + + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + // The Memory manager static policy does not support pod-level resources. + logger.V(5).Info("Memory manager allocation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "pod", klog.KObj(pod)) + + return true + } + + return false +} diff --git a/pkg/kubelet/cm/memorymanager/policy_static_test.go b/pkg/kubelet/cm/memorymanager/policy_static_test.go index cf85d75ddb8..d10e43898b9 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static_test.go +++ b/pkg/kubelet/cm/memorymanager/policy_static_test.go @@ -2017,9 +2017,9 @@ func TestStaticPolicyAllocate(t *testing.T) { } err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0]) - if !reflect.DeepEqual(err, testCase.expectedError) { - t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) - } + if (err == nil) != (testCase.expectedError == nil) || (err != nil && testCase.expectedError != nil && err.Error() != testCase.expectedError.Error()) { + t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) + } if err != nil { return diff --git a/pkg/kubelet/cm/topologymanager/scope.go b/pkg/kubelet/cm/topologymanager/scope.go index db3edd63e64..df69aad06fe 100644 --- a/pkg/kubelet/cm/topologymanager/scope.go +++ b/pkg/kubelet/cm/topologymanager/scope.go @@ -19,8 +19,11 @@ package topologymanager import ( "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -148,11 +151,21 @@ func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult { // It would be better to implement this function in topologymanager instead of scope // but topologymanager do not track providers anymore func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error { + isPodLevelResourcesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) + + var errs []error for _, provider := range s.hintProviders { err := provider.Allocate(pod, container) - if err != nil { + if err != nil && isPodLevelResourcesEnabled { + errs = append(errs, err) + } else if err != nil { return err } } + + if isPodLevelResourcesEnabled { + return utilerrors.NewAggregate(errs) + } + return nil } diff --git a/pkg/kubelet/lifecycle/interfaces.go b/pkg/kubelet/lifecycle/interfaces.go index 3be7adade54..b8b8dea8ca7 100644 --- a/pkg/kubelet/lifecycle/interfaces.go +++ b/pkg/kubelet/lifecycle/interfaces.go @@ -35,6 +35,8 @@ type PodAdmitResult struct { Reason string // a brief message explaining why the pod could not be admitted. Message string + // all errors for why the pod could not be admitted. + Errors []error } // PodAdmitHandler is notified during pod admission.