From 7804b51f42011e52a4e82a761efe098a8160219a Mon Sep 17 00:00:00 2001 From: Kevin Torres Date: Mon, 30 Jun 2025 21:05:58 +0000 Subject: [PATCH 1/6] CPU and Memory manager event when using pod level resources --- pkg/kubelet/allocation/allocation_manager.go | 9 ++++ pkg/kubelet/cm/admission/errors.go | 53 +++++++++++++++++-- pkg/kubelet/cm/cpumanager/cpu_manager.go | 8 +++ pkg/kubelet/cm/cpumanager/policy_static.go | 36 ++++++++++++- .../cm/memorymanager/memory_manager.go | 7 +++ pkg/kubelet/cm/memorymanager/policy_static.go | 43 ++++++++++++++- .../cm/memorymanager/policy_static_test.go | 6 +-- pkg/kubelet/cm/topologymanager/scope.go | 17 +++++- pkg/kubelet/lifecycle/interfaces.go | 2 + 9 files changed, 171 insertions(+), 10 deletions(-) diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index c7eff6a01d0..5ec80f999ff 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -18,6 +18,7 @@ package allocation import ( "context" + "errors" "fmt" "path/filepath" "slices" @@ -41,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/allocation/state" "k8s.io/kubernetes/pkg/kubelet/cm" + "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -701,6 +703,13 @@ func (m *manager) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, strin if result := podAdmitHandler.Admit(attrs); !result.Admit { klog.InfoS("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message) return false, result.Reason, result.Message + } else if result.Admit && len(result.Errors) > 0 && result.Reason == admission.PodLevelResourcesIncompatible && utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) { + for _, err := range result.Errors { + var admissionWarning admission.Error + if errors.As(err, &admissionWarning) { + m.recorder.Event(attrs.Pod, v1.EventTypeWarning, admission.PodLevelResourcesIncompatible, admissionWarning.Error()) + } + } } } diff --git a/pkg/kubelet/cm/admission/errors.go b/pkg/kubelet/cm/admission/errors.go index 5e549220394..faac0098ec2 100644 --- a/pkg/kubelet/cm/admission/errors.go +++ b/pkg/kubelet/cm/admission/errors.go @@ -20,11 +20,19 @@ import ( "errors" "fmt" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) const ( ErrorReasonUnexpected = "UnexpectedAdmissionError" + + // Explicit reason when CPU/Memory manager's policy is incompatible with pod level resources. + PodLevelResourcesIncompatible = "PodLevelResourcesIncompatible" + + // Warnings for pod level resources when manager's policy incompatibility. + CPUManagerPodLevelResourcesError = "CPUManagerPodLevelResourcesError" + MemoryManagerPodLevelResourcesError = "MemoryManagerPodLevelResourcesError" ) type Error interface { @@ -49,14 +57,53 @@ func GetPodAdmitResult(err error) lifecycle.PodAdmitResult { return lifecycle.PodAdmitResult{Admit: true} } + var errs []error + // To support multiple pod-level resource errors, we need to check if the error + // is an aggregate error. + var agg utilerrors.Aggregate + if errors.As(err, &agg) { + errs = agg.Errors() + } else { + errs = []error{err} + } + + var podLevelWarnings []error + var otherErrs []error + for _, e := range errs { + var admissionErr Error + if errors.As(e, &admissionErr) && (admissionErr.Type() == CPUManagerPodLevelResourcesError || admissionErr.Type() == MemoryManagerPodLevelResourcesError) { + podLevelWarnings = append(podLevelWarnings, e) + } else { + otherErrs = append(otherErrs, e) + } + } + + // If all errors are pod-level resource errors, we should treat them as warnings + // and not block pod admission. + if len(otherErrs) == 0 && len(podLevelWarnings) > 0 { + return lifecycle.PodAdmitResult{ + Admit: true, + Reason: PodLevelResourcesIncompatible, + Message: "", + Errors: podLevelWarnings, + } + } + + if len(otherErrs) == 0 { + // This should not happen if err != nil, but as a safeguard. + return lifecycle.PodAdmitResult{Admit: true} + } + + // At this point, we have at least one error that requires pod rejection. + firstErr := otherErrs[0] var admissionErr Error - if !errors.As(err, &admissionErr) { - admissionErr = &unexpectedAdmissionError{err} + if !errors.As(firstErr, &admissionErr) { + admissionErr = &unexpectedAdmissionError{firstErr} } return lifecycle.PodAdmitResult{ + Admit: false, Message: admissionErr.Error(), Reason: admissionErr.Type(), - Admit: false, } } diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 8b59ba67121..1d98eef8320 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -18,6 +18,7 @@ package cpumanager import ( "context" + "errors" "fmt" "math" "sync" @@ -261,6 +262,13 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { // Call down into the policy to assign this container CPUs if required. err := m.policy.Allocate(m.state, p, c) + + // If it gets this error it means that the pod requires pod level resources but this is not aligned. + // We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning + if errors.As(err, &CPUManagerPodLevelResourcesError{}) { + return err + } + if err != nil { klog.ErrorS(err, "Allocate error") return err diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 28591c5baf1..fb787c35cc5 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -18,10 +18,12 @@ package cpumanager import ( "fmt" + "strconv" v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" @@ -35,13 +37,15 @@ import ( ) const ( - // PolicyStatic is the name of the static policy. // Should options be given, these will be ignored and backward (up to 1.21 included) // compatible behaviour will be enforced PolicyStatic policyName = "static" // ErrorSMTAlignment represents the type of an SMTAlignmentError ErrorSMTAlignment = "SMTAlignmentError" + + // ErrorCPUManagerPodLevelResources represents the type of a CPUManagerPodLevelResourcesError + ErrorCPUManagerPodLevelResources = "CPUManagerPodLevelResourcesError" ) // SMTAlignmentError represents an error due to SMT alignment @@ -52,6 +56,15 @@ type SMTAlignmentError struct { CausedByPhysicalCPUs bool } +// PodLevelResourcesError represents an error due to pod-level resources not being supported. +type CPUManagerPodLevelResourcesError struct{} + +func (e CPUManagerPodLevelResourcesError) Error() string { + return "CPU Manager static policy does not support pod-level resources" +} + +func (e CPUManagerPodLevelResourcesError) Type() string { return ErrorCPUManagerPodLevelResources } + func (e SMTAlignmentError) Error() string { if e.CausedByPhysicalCPUs { return fmt.Sprintf("SMT Alignment Error: not enough free physical CPUs: available physical CPUs = %d, requested CPUs = %d, CPUs per core = %d", e.AvailablePhysicalCPUs, e.RequestedCPUs, e.CpusPerCore) @@ -316,6 +329,10 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { numCPUs := p.guaranteedCPUs(pod, container) if numCPUs == 0 { + if p.isPodWithPodLevelResources(pod) { + return CPUManagerPodLevelResourcesError{} + } + // container belongs in the shared pool (nothing to do; use default cpuset) return nil } @@ -467,6 +484,12 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int klog.V(5).InfoS("Exclusive CPU allocation skipped, pod QoS is not guaranteed", "pod", klog.KObj(pod), "containerName", container.Name, "qos", qos) return 0 } + + // The CPU manager static policy does not support pod-level resources. + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + return 0 + } + cpuQuantity := container.Resources.Requests[v1.ResourceCPU] // In-place pod resize feature makes Container.Resources field mutable for CPU & memory. // AllocatedResources holds the value of Container.Resources.Requests when the pod was admitted. @@ -814,3 +837,14 @@ func updateAllocationPerNUMAMetric(topo *topology.CPUTopology, allocatedCPUs cpu metrics.CPUManagerAllocationPerNUMA.WithLabelValues(strconv.Itoa(numaNode)).Set(float64(count)) } } + +func (p *staticPolicy) isPodWithPodLevelResources(pod *v1.Pod) bool { + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + // The Memory manager static policy does not support pod-level resources. + klog.V(5).InfoS("CPU Manager allocation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod)) + + return true + } + + return false +} diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index 572f58599a0..9d5d186f703 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -18,6 +18,7 @@ package memorymanager import ( "context" + "errors" "fmt" "runtime" "sync" @@ -273,6 +274,12 @@ func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error { // Call down into the policy to assign this container memory if required. if err := m.policy.Allocate(ctx, m.state, pod, container); err != nil { + // If it gets this error it means that the pod requires pod level resources but this is not aligned. + // We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning + if errors.As(err, &MemoryManagerPodLevelResourcesError{}) { + return err + } + logger.Error(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name) return err } diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go index ce85610b65d..7939d7920aa 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static.go +++ b/pkg/kubelet/cm/memorymanager/policy_static.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" utilfeature "k8s.io/apiserver/pkg/util/feature" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -38,7 +39,12 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" ) -const PolicyTypeStatic policyType = "Static" +const ( + PolicyTypeStatic policyType = "Static" + + // ErrorMemoryManagerPodLevelResources represents the type of a MemoryManagerPodLevelResourcesError + ErrorMemoryManagerPodLevelResources = "MemoryManagerPodLevelResourcesError" +) type systemReservedMemory map[int]map[v1.ResourceName]uint64 type reusableMemory map[string]map[string]map[v1.ResourceName]uint64 @@ -60,6 +66,16 @@ type staticPolicy struct { var _ Policy = &staticPolicy{} +type MemoryManagerPodLevelResourcesError struct{} + +func (e MemoryManagerPodLevelResourcesError) Type() string { + return ErrorMemoryManagerPodLevelResources +} + +func (e MemoryManagerPodLevelResourcesError) Error() string { + return "Memory Manager static policy does not support pod-level resources" +} + // NewPolicyStatic returns new static policy instance func NewPolicyStatic(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) { var totalSystemReserved uint64 @@ -107,6 +123,10 @@ func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, return nil } + if p.isPodWithPodLevelResources(ctx, pod) { + return MemoryManagerPodLevelResourcesError{} + } + podUID := string(pod.UID) logger.Info("Allocate") // container belongs in an exclusively allocated pool @@ -406,6 +426,10 @@ func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, p return nil } + if p.isPodWithPodLevelResources(ctx, pod) { + return nil + } + reqRsrcs, err := getPodRequestedResources(pod) if err != nil { logger.Error(err, "Failed to get pod requested resources", "podUID", pod.UID) @@ -436,6 +460,10 @@ func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod return nil } + if p.isPodWithPodLevelResources(ctx, pod) { + return nil + } + requestedResources, err := getRequestedResources(pod, container) if err != nil { logger.Error(err, "Failed to get container requested resources", "podUID", pod.UID, "containerName", container.Name) @@ -1076,3 +1104,16 @@ func isAffinityViolatingNUMAAllocations(machineState state.NUMANodeMap, mask bit } return false } + +func (p *staticPolicy) isPodWithPodLevelResources(ctx context.Context, pod *v1.Pod) bool { + logger := klog.FromContext(ctx) + + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + // The Memory manager static policy does not support pod-level resources. + logger.V(5).Info("Memory manager allocation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "pod", klog.KObj(pod)) + + return true + } + + return false +} diff --git a/pkg/kubelet/cm/memorymanager/policy_static_test.go b/pkg/kubelet/cm/memorymanager/policy_static_test.go index cf85d75ddb8..d10e43898b9 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static_test.go +++ b/pkg/kubelet/cm/memorymanager/policy_static_test.go @@ -2017,9 +2017,9 @@ func TestStaticPolicyAllocate(t *testing.T) { } err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0]) - if !reflect.DeepEqual(err, testCase.expectedError) { - t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) - } + if (err == nil) != (testCase.expectedError == nil) || (err != nil && testCase.expectedError != nil && err.Error() != testCase.expectedError.Error()) { + t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) + } if err != nil { return diff --git a/pkg/kubelet/cm/topologymanager/scope.go b/pkg/kubelet/cm/topologymanager/scope.go index db3edd63e64..df69aad06fe 100644 --- a/pkg/kubelet/cm/topologymanager/scope.go +++ b/pkg/kubelet/cm/topologymanager/scope.go @@ -19,8 +19,11 @@ package topologymanager import ( "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -148,11 +151,21 @@ func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult { // It would be better to implement this function in topologymanager instead of scope // but topologymanager do not track providers anymore func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error { + isPodLevelResourcesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) + + var errs []error for _, provider := range s.hintProviders { err := provider.Allocate(pod, container) - if err != nil { + if err != nil && isPodLevelResourcesEnabled { + errs = append(errs, err) + } else if err != nil { return err } } + + if isPodLevelResourcesEnabled { + return utilerrors.NewAggregate(errs) + } + return nil } diff --git a/pkg/kubelet/lifecycle/interfaces.go b/pkg/kubelet/lifecycle/interfaces.go index 3be7adade54..b8b8dea8ca7 100644 --- a/pkg/kubelet/lifecycle/interfaces.go +++ b/pkg/kubelet/lifecycle/interfaces.go @@ -35,6 +35,8 @@ type PodAdmitResult struct { Reason string // a brief message explaining why the pod could not be admitted. Message string + // all errors for why the pod could not be admitted. + Errors []error } // PodAdmitHandler is notified during pod admission. From 5672750e6a3f3d335d269b7f4804e0ae0f679e93 Mon Sep 17 00:00:00 2001 From: Kevin Torres Date: Thu, 10 Jul 2025 23:42:43 +0000 Subject: [PATCH 2/6] Unit tests for no hints nor aligment of CPU and Memory --- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 16 +++ .../cm/cpumanager/topology_hints_test.go | 35 +++-- .../cm/memorymanager/memory_manager_test.go | 7 + .../cm/memorymanager/policy_static_test.go | 123 +++++++++++++++++- 4 files changed, 169 insertions(+), 12 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 6c3af2dc3f3..742dfc11ae9 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -172,6 +172,22 @@ func makePod(podUID, containerName, cpuRequest, cpuLimit string) *v1.Pod { return pod } +func makePodWithPodLevelResources(podUID, podCPURequest, podCPULimit, containerName, cpuRequest, cpuLimit string) *v1.Pod { + pod := makePod(podUID, containerName, cpuRequest, cpuLimit) + pod.Spec.Resources = &v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(podCPURequest), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(podCPULimit), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + } + + return pod +} + func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index f6c230bc32f..4773c779a67 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -26,7 +26,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" - pkgfeatures "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -35,12 +35,13 @@ import ( ) type testCase struct { - name string - pod v1.Pod - container v1.Container - assignments state.ContainerCPUAssignments - defaultCPUSet cpuset.CPUSet - expectedHints []topologymanager.TopologyHint + name string + pod v1.Pod + container v1.Container + assignments state.ContainerCPUAssignments + defaultCPUSet cpuset.CPUSet + expectedHints []topologymanager.TopologyHint + podLevelResourcesEnabled bool } func returnMachineInfo() cadvisorapi.MachineInfo { @@ -210,9 +211,10 @@ func TestPodGuaranteedCPUs(t *testing.T) { func TestGetTopologyHints(t *testing.T) { machineInfo := returnMachineInfo() - tcases := returnTestCases() - for _, tc := range tcases { + for _, tc := range returnTestCases() { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, tc.podLevelResourcesEnabled) + topology, _ := topology.Discover(&machineInfo) var activePods []*v1.Pod @@ -261,6 +263,8 @@ func TestGetPodTopologyHints(t *testing.T) { machineInfo := returnMachineInfo() for _, tc := range returnTestCases() { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, tc.podLevelResourcesEnabled) + topology, _ := topology.Discover(&machineInfo) var activePods []*v1.Pod @@ -442,7 +446,7 @@ func TestGetPodTopologyHintsWithPolicyOptions(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CPUManagerPolicyAlphaOptions, true) var activePods []*v1.Pod for p := range testCase.assignments { @@ -495,6 +499,9 @@ func returnTestCases() []testCase { testPod4 := makePod("fakePod", "fakeContainer", "11", "11") testContainer4 := &testPod4.Spec.Containers[0] + testPod5 := makePodWithPodLevelResources("fakePod", "5", "5", "fakeContainer", "4", "4") + testContainer5 := &testPod5.Spec.Containers[0] + firstSocketMask, _ := bitmask.NewBitMask(0) secondSocketMask, _ := bitmask.NewBitMask(1) crossSocketMask, _ := bitmask.NewBitMask(0, 1) @@ -657,5 +664,13 @@ func returnTestCases() []testCase { defaultCPUSet: cpuset.New(), expectedHints: []topologymanager.TopologyHint{}, }, + { + name: "Pod has pod level resources, no hint generation", + pod: *testPod5, + container: *testContainer5, + defaultCPUSet: cpuset.New(0, 1, 2, 3), + expectedHints: nil, + podLevelResourcesEnabled: true, + }, } } diff --git a/pkg/kubelet/cm/memorymanager/memory_manager_test.go b/pkg/kubelet/cm/memorymanager/memory_manager_test.go index 8bcfa3fd3d7..fa5a9aa4479 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager_test.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager_test.go @@ -151,6 +151,13 @@ func getPod(podUID string, containerName string, requirements *v1.ResourceRequir } } +func getPodWithPodLevelResources(podUID string, podRequirements *v1.ResourceRequirements, containerName string, containerRequirements *v1.ResourceRequirements) *v1.Pod { + pod := getPod(podUID, containerName, containerRequirements) + pod.Spec.Resources = podRequirements + + return pod +} + func getPodWithInitContainers(podUID string, containers []v1.Container, initContainers []v1.Container) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/kubelet/cm/memorymanager/policy_static_test.go b/pkg/kubelet/cm/memorymanager/policy_static_test.go index d10e43898b9..be8ce520ae1 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static_test.go +++ b/pkg/kubelet/cm/memorymanager/policy_static_test.go @@ -26,6 +26,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" @@ -42,6 +45,17 @@ const ( var ( containerRestartPolicyAlways = v1.ContainerRestartPolicyAlways + podLevelRequirementsGuaranteed = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000Mi"), + v1.ResourceMemory: resource.MustParse("1Gi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000Mi"), + v1.ResourceMemory: resource.MustParse("1Gi"), + }, + } + requirementsGuaranteed = &v1.ResourceRequirements{ Limits: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("1000Mi"), @@ -131,6 +145,7 @@ type testStaticPolicy struct { topologyHint *topologymanager.TopologyHint expectedTopologyHints map[string][]topologymanager.TopologyHint initContainersReusableMemory reusableMemory + podLevelResourcesEnabled bool } func initTests(t *testing.T, testCase *testStaticPolicy, hint *topologymanager.TopologyHint, initContainersReusableMemory reusableMemory) (Policy, state.State, error) { @@ -2006,10 +2021,68 @@ func TestStaticPolicyAllocate(t *testing.T) { topologyHint: &topologymanager.TopologyHint{NUMANodeAffinity: newNUMAAffinity(0, 1), Preferred: true}, expectedError: fmt.Errorf("[memorymanager] preferred hint violates NUMA node allocation"), }, + { + description: "should do nothing for guaranteed pod with pod level resources", + expectedAssignments: state.ContainerMemoryAssignments{}, + machineState: state.NUMANodeMap{ + 0: &state.NUMANodeState{ + MemoryMap: map[v1.ResourceName]*state.MemoryTable{ + v1.ResourceMemory: { + Allocatable: 1536 * mb, + Free: 1536 * mb, + Reserved: 0, + SystemReserved: 512 * mb, + TotalMemSize: 2 * gb, + }, + hugepages1Gi: { + Allocatable: gb, + Free: gb, + Reserved: 0, + SystemReserved: 0, + TotalMemSize: gb, + }, + }, + Cells: []int{}, + }, + }, + expectedMachineState: state.NUMANodeMap{ + 0: &state.NUMANodeState{ + MemoryMap: map[v1.ResourceName]*state.MemoryTable{ + v1.ResourceMemory: { + Allocatable: 1536 * mb, + Free: 1536 * mb, + Reserved: 0, + SystemReserved: 512 * mb, + TotalMemSize: 2 * gb, + }, + hugepages1Gi: { + Allocatable: gb, + Free: gb, + Reserved: 0, + SystemReserved: 0, + TotalMemSize: gb, + }, + }, + Cells: []int{}, + }, + }, + systemReserved: systemReservedMemory{ + 0: map[v1.ResourceName]uint64{ + v1.ResourceMemory: 512 * mb, + }, + }, + pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed), + expectedTopologyHints: nil, + topologyHint: &topologymanager.TopologyHint{}, + expectedError: fmt.Errorf("Memory Manager static policy does not support pod-level resources"), + podLevelResourcesEnabled: true, + }, } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled) + t.Logf("TestStaticPolicyAllocate %s", testCase.description) p, s, err := initTests(t, &testCase, testCase.topologyHint, nil) if err != nil { @@ -2018,8 +2091,8 @@ func TestStaticPolicyAllocate(t *testing.T) { err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0]) if (err == nil) != (testCase.expectedError == nil) || (err != nil && testCase.expectedError != nil && err.Error() != testCase.expectedError.Error()) { - t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) - } + t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) + } if err != nil { return @@ -3732,10 +3805,23 @@ func TestStaticPolicyGetTopologyHints(t *testing.T) { }, }, }, + { + description: "should not provide topology hints for guaranteed pod with pod level resources", + pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed), + systemReserved: systemReservedMemory{ + 0: map[v1.ResourceName]uint64{ + v1.ResourceMemory: 1024 * mb, + }, + }, + expectedTopologyHints: nil, + podLevelResourcesEnabled: true, + }, } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled) + p, s, err := initTests(t, &testCase, nil, nil) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3749,6 +3835,39 @@ func TestStaticPolicyGetTopologyHints(t *testing.T) { } } +func TestStaticPolicyGetPodTopologyHints(t *testing.T) { + _, tCtx := ktesting.NewTestContext(t) + testCases := []testStaticPolicy{ + { + description: "should not provide pod topology hints for guaranteed pod with pod level resources", + pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed), + systemReserved: systemReservedMemory{ + 0: map[v1.ResourceName]uint64{ + v1.ResourceMemory: 1024 * mb, + }, + }, + expectedTopologyHints: nil, + podLevelResourcesEnabled: true, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled) + + p, s, err := initTests(t, &testCase, nil, nil) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + topologyHints := p.GetPodTopologyHints(tCtx, s, testCase.pod) + if !reflect.DeepEqual(topologyHints, testCase.expectedTopologyHints) { + t.Fatalf("The actual topology hints: '%+v' are different from the expected one: '%+v'", topologyHints, testCase.expectedTopologyHints) + } + }) + } +} + func Test_getPodRequestedResources(t *testing.T) { testCases := []struct { description string From 766d011bba25c0477cbd43a72228a49e2e5a40a7 Mon Sep 17 00:00:00 2001 From: Kevin Torres Date: Sat, 12 Jul 2025 00:47:58 +0000 Subject: [PATCH 3/6] E2E tests for no hints nor aligment of CPU and Memory managers --- test/e2e_node/cpumanager_test.go | 134 +++++++++++++++++++++ test/e2e_node/memory_manager_test.go | 171 +++++++++++++++++++++++---- 2 files changed, 284 insertions(+), 21 deletions(-) diff --git a/test/e2e_node/cpumanager_test.go b/test/e2e_node/cpumanager_test.go index 4d248f8ff69..eed312cc922 100644 --- a/test/e2e_node/cpumanager_test.go +++ b/test/e2e_node/cpumanager_test.go @@ -39,9 +39,11 @@ import ( "github.com/onsi/gomega/types" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" @@ -1921,6 +1923,138 @@ var _ = SIGDescribe("CPU Manager", ginkgo.Ordered, ginkgo.ContinueOnFailure, fra }) }) +var _ = SIGDescribe("CPU Manager Incompatibility Pod Level Resources", ginkgo.Ordered, ginkgo.ContinueOnFailure, framework.WithSerial(), feature.CPUManager, feature.PodLevelResources, framework.WithFeatureGate(features.PodLevelResources), func() { + f := framework.NewDefaultFramework("cpu-manager-incompatibility-pod-level-resources-test") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + // original kubeletconfig before the context start, to be restored + var oldCfg *kubeletconfig.KubeletConfiguration + var reservedCPUs cpuset.CPUSet + var onlineCPUs cpuset.CPUSet + var smtLevel int + var uncoreGroupSize int + // tracks all the pods created by a It() block. Best would be a namespace per It block + var podMap map[string]*v1.Pod + + ginkgo.BeforeAll(func(ctx context.Context) { + var err error + oldCfg, err = getCurrentKubeletConfig(ctx) + framework.ExpectNoError(err) + + onlineCPUs, err = getOnlineCPUs() // this should not change at all, at least during this suite lifetime + framework.ExpectNoError(err) + framework.Logf("Online CPUs: %s", onlineCPUs) + + smtLevel = smtLevelFromSysFS() // this should not change at all, at least during this suite lifetime + framework.Logf("SMT level: %d", smtLevel) + + uncoreGroupSize = getUncoreCPUGroupSize() + framework.Logf("Uncore Group Size: %d", uncoreGroupSize) + + e2enodeCgroupV2Enabled = IsCgroup2UnifiedMode() + framework.Logf("cgroup V2 enabled: %v", e2enodeCgroupV2Enabled) + + e2enodeCgroupDriver = oldCfg.CgroupDriver + framework.Logf("cgroup driver: %s", e2enodeCgroupDriver) + + runtime, _, err := getCRIClient() + framework.ExpectNoError(err, "Failed to get CRI client") + + version, err := runtime.Version(context.Background(), "") + framework.ExpectNoError(err, "Failed to get runtime version") + + e2enodeRuntimeName = version.GetRuntimeName() + framework.Logf("runtime: %s", e2enodeRuntimeName) + }) + + ginkgo.AfterAll(func(ctx context.Context) { + updateKubeletConfig(ctx, f, oldCfg, true) + }) + + ginkgo.BeforeEach(func(ctx context.Context) { + // note intentionally NOT set reservedCPUs - this must be initialized on a test-by-test basis + podMap = make(map[string]*v1.Pod) + }) + + ginkgo.JustBeforeEach(func(ctx context.Context) { + if !e2enodeCgroupV2Enabled { + e2eskipper.Skipf("Skipping since CgroupV2 not used") + } + }) + + ginkgo.AfterEach(func(ctx context.Context) { + deletePodsAsync(ctx, f, podMap) + }) + + ginkgo.When("running guaranteed pod level resources tests", ginkgo.Label("guaranteed pod level resources", "reserved-cpus"), func() { + ginkgo.It("should let the container access all the online CPUs without a reserved CPUs set", func(ctx context.Context) { + updateKubeletConfigIfNeeded(ctx, f, configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: cpuset.CPUSet{}, + })) + + pod := makeCPUManagerPod("gu-pod-level-resources", []ctnAttribute{ + { + ctnName: "gu-container", + cpuRequest: "1", + cpuLimit: "1", + }, + }) + pod.Spec.Resources = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + } + + ginkgo.By("creating the test pod") + pod = e2epod.NewPodClient(f).CreateSync(ctx, pod) + podMap[string(pod.UID)] = pod + + ginkgo.By("checking if the expected cpuset was assigned") + gomega.Expect(pod).To(HaveContainerCPUsEqualTo("gu-container", onlineCPUs)) + }) + + ginkgo.It("should let the container access all the online CPUs when using a reserved CPUs set", func(ctx context.Context) { + reservedCPUs = cpuset.New(0) + + updateKubeletConfigIfNeeded(ctx, f, configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: reservedCPUs, // Not really needed for the tests but helps to make a more precise check + })) + + pod := makeCPUManagerPod("gu-pod-level-resources", []ctnAttribute{ + { + ctnName: "gu-container", + cpuRequest: "1", + cpuLimit: "1", + }, + }) + pod.Spec.Resources = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + } + + ginkgo.By("creating the test pod") + pod = e2epod.NewPodClient(f).CreateSync(ctx, pod) + podMap[string(pod.UID)] = pod + + ginkgo.By("checking if the expected cpuset was assigned") + gomega.Expect(pod).To(HaveContainerCPUsEqualTo("gu-container", onlineCPUs)) + }) + }) +}) + // Matching helpers func HaveStatusReasonMatchingRegex(expr string) types.GomegaMatcher { diff --git a/test/e2e_node/memory_manager_test.go b/test/e2e_node/memory_manager_test.go index de371c33231..23867970d71 100644 --- a/test/e2e_node/memory_manager_test.go +++ b/test/e2e_node/memory_manager_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" + "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" @@ -238,6 +239,18 @@ func getAllNUMANodes() []int { return numaNodes } +func verifyMemoryPinning(f *framework.Framework, ctx context.Context, pod *v1.Pod, numaNodeIDs []int) { + ginkgo.By("Verifying the NUMA pinning") + + output, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name) + framework.ExpectNoError(err) + + currentNUMANodeIDs, err := cpuset.Parse(strings.Trim(output, "\n")) + framework.ExpectNoError(err) + + gomega.Expect(numaNodeIDs).To(gomega.Equal(currentNUMANodeIDs.List())) +} + // Serial because the test updates kubelet configuration. var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), framework.WithSerial(), feature.MemoryManager, func() { // TODO: add more complex tests that will include interaction between CPUManager, MemoryManager and TopologyManager @@ -267,18 +280,6 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), evictionHard: map[string]string{evictionHardMemory: "100Mi"}, } - verifyMemoryPinning := func(ctx context.Context, pod *v1.Pod, numaNodeIDs []int) { - ginkgo.By("Verifying the NUMA pinning") - - output, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name) - framework.ExpectNoError(err) - - currentNUMANodeIDs, err := cpuset.Parse(strings.Trim(output, "\n")) - framework.ExpectNoError(err) - - gomega.Expect(numaNodeIDs).To(gomega.Equal(currentNUMANodeIDs.List())) - } - waitingForHugepages := func(ctx context.Context, hugepagesCount int) { gomega.Eventually(ctx, func(ctx context.Context) error { node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, framework.TestContext.NodeName, metav1.GetOptions{}) @@ -380,7 +381,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - defer conn.Close() + defer conn.Close() //nolint:errcheck resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) framework.ExpectNoError(err) @@ -444,7 +445,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), return } - verifyMemoryPinning(ctx, testPod, []int{0}) + verifyMemoryPinning(f, ctx, testPod, []int{0}) }) }) @@ -469,7 +470,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), return } - verifyMemoryPinning(ctx, testPod, []int{0}) + verifyMemoryPinning(f, ctx, testPod, []int{0}) }) }) @@ -503,8 +504,8 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), return } - verifyMemoryPinning(ctx, testPod, []int{0}) - verifyMemoryPinning(ctx, testPod2, []int{0}) + verifyMemoryPinning(f, ctx, testPod, []int{0}) + verifyMemoryPinning(f, ctx, testPod2, []int{0}) }) // TODO: move the test to pod resource API test suite, see - https://github.com/kubernetes/kubernetes/issues/101945 @@ -520,7 +521,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - defer conn.Close() + defer conn.Close() //nolint:errcheck resp, err := cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) framework.ExpectNoError(err) @@ -665,7 +666,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - defer conn.Close() + defer conn.Close() //nolint:errcheck resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) framework.ExpectNoError(err) @@ -682,7 +683,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - defer conn.Close() + defer conn.Close() //nolint:errcheck resp, err := cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) framework.ExpectNoError(err) @@ -706,7 +707,135 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), return } - verifyMemoryPinning(ctx, testPod, allNUMANodes) + verifyMemoryPinning(f, ctx, testPod, allNUMANodes) + }) + }) + }) +}) + +var _ = SIGDescribe("Memory Manager Incompatibility Pod Level Resources", framework.WithDisruptive(), framework.WithSerial(), feature.MemoryManager, feature.PodLevelResources, framework.WithFeatureGate(features.PodLevelResources), func() { + var ( + allNUMANodes []int + ctnParams, initCtnParams []memoryManagerCtnAttributes + isMultiNUMASupported *bool + testPod *v1.Pod + ) + + f := framework.NewDefaultFramework("memory-manager-incompatibility-pod-level-resources-test") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + memoryQuantity := resource.MustParse("1100Mi") + defaultKubeParams := &memoryManagerKubeletParams{ + systemReservedMemory: []kubeletconfig.MemoryReservation{ + { + NumaNode: 0, + Limits: v1.ResourceList{ + resourceMemory: memoryQuantity, + }, + }, + }, + systemReserved: map[string]string{resourceMemory: "500Mi"}, + kubeReserved: map[string]string{resourceMemory: "500Mi"}, + evictionHard: map[string]string{evictionHardMemory: "100Mi"}, + } + + ginkgo.BeforeEach(func(ctx context.Context) { + if isMultiNUMASupported == nil { + isMultiNUMASupported = ptr.To(isMultiNUMA()) + } + + if len(allNUMANodes) == 0 { + allNUMANodes = getAllNUMANodes() + } + }) + + // dynamically update the kubelet configuration + ginkgo.JustBeforeEach(func(ctx context.Context) { + if len(ctnParams) > 0 { + testPod = makeMemoryManagerPod(ctnParams[0].ctnName, initCtnParams, ctnParams) + testPod.Spec.Resources = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("128Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("128Mi"), + }, + } + } + }) + + ginkgo.JustAfterEach(func(ctx context.Context) { + // delete the test pod + if testPod != nil && testPod.Name != "" { + e2epod.NewPodClient(f).DeleteSync(ctx, testPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + } + }) + + ginkgo.Context("with static policy", func() { + tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) { + kubeParams := *defaultKubeParams + kubeParams.policy = staticPolicy + updateKubeletConfigWithMemoryManagerParams(initialConfig, &kubeParams) + }) + + ginkgo.Context("", func() { + ginkgo.BeforeEach(func() { + // override pod parameters + ctnParams = []memoryManagerCtnAttributes{ + { + ctnName: "memory-manager-none", + cpus: "100m", + memory: "128Mi", + }, + } + }) + + ginkgo.JustAfterEach(func() { + // reset containers attributes + ctnParams = []memoryManagerCtnAttributes{} + initCtnParams = []memoryManagerCtnAttributes{} + }) + + ginkgo.It("should not report any memory data during request to pod resources List when pod has pod level resources", func(ctx context.Context) { + testPod = e2epod.NewPodClient(f).CreateSync(ctx, testPod) + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + var resp *kubeletpodresourcesv1.ListPodResourcesResponse + gomega.Eventually(ctx, func(ctx context.Context) error { + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + if err != nil { + return err + } + defer conn.Close() //nolint:errcheck + resp, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) + + return err + }, time.Minute, 5*time.Second).Should(gomega.Succeed()) + + for _, podResource := range resp.PodResources { + if podResource.Name != testPod.Name { + continue + } + + for _, containerResource := range podResource.Containers { + gomega.Expect(containerResource.Memory).To(gomega.BeEmpty()) + } + } + }) + + ginkgo.It("should succeed to start the pod when it has pod level resources", func(ctx context.Context) { + testPod = e2epod.NewPodClient(f).CreateSync(ctx, testPod) + + // it no taste to verify NUMA pinning when the node has only one NUMA node + if !*isMultiNUMASupported { + return + } + + verifyMemoryPinning(f, ctx, testPod, allNUMANodes) }) }) }) From 15b1a7fd31775faf59c19c927e771fbc392fa9d1 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 29 Jul 2025 16:41:37 +0200 Subject: [PATCH 4/6] Revert "CPU and Memory manager event when using pod level resources" This reverts commit 7804b51f42011e52a4e82a761efe098a8160219a. --- pkg/kubelet/allocation/allocation_manager.go | 9 ---- pkg/kubelet/cm/admission/errors.go | 53 ++----------------- pkg/kubelet/cm/cpumanager/cpu_manager.go | 8 --- pkg/kubelet/cm/cpumanager/policy_static.go | 36 +------------ .../cm/memorymanager/memory_manager.go | 7 --- pkg/kubelet/cm/memorymanager/policy_static.go | 43 +-------------- pkg/kubelet/cm/topologymanager/scope.go | 17 +----- pkg/kubelet/lifecycle/interfaces.go | 2 - 8 files changed, 7 insertions(+), 168 deletions(-) diff --git a/pkg/kubelet/allocation/allocation_manager.go b/pkg/kubelet/allocation/allocation_manager.go index 5ec80f999ff..c7eff6a01d0 100644 --- a/pkg/kubelet/allocation/allocation_manager.go +++ b/pkg/kubelet/allocation/allocation_manager.go @@ -18,7 +18,6 @@ package allocation import ( "context" - "errors" "fmt" "path/filepath" "slices" @@ -42,7 +41,6 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/allocation/state" "k8s.io/kubernetes/pkg/kubelet/cm" - "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -703,13 +701,6 @@ func (m *manager) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, strin if result := podAdmitHandler.Admit(attrs); !result.Admit { klog.InfoS("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message) return false, result.Reason, result.Message - } else if result.Admit && len(result.Errors) > 0 && result.Reason == admission.PodLevelResourcesIncompatible && utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) { - for _, err := range result.Errors { - var admissionWarning admission.Error - if errors.As(err, &admissionWarning) { - m.recorder.Event(attrs.Pod, v1.EventTypeWarning, admission.PodLevelResourcesIncompatible, admissionWarning.Error()) - } - } } } diff --git a/pkg/kubelet/cm/admission/errors.go b/pkg/kubelet/cm/admission/errors.go index faac0098ec2..5e549220394 100644 --- a/pkg/kubelet/cm/admission/errors.go +++ b/pkg/kubelet/cm/admission/errors.go @@ -20,19 +20,11 @@ import ( "errors" "fmt" - utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/kubernetes/pkg/kubelet/lifecycle" ) const ( ErrorReasonUnexpected = "UnexpectedAdmissionError" - - // Explicit reason when CPU/Memory manager's policy is incompatible with pod level resources. - PodLevelResourcesIncompatible = "PodLevelResourcesIncompatible" - - // Warnings for pod level resources when manager's policy incompatibility. - CPUManagerPodLevelResourcesError = "CPUManagerPodLevelResourcesError" - MemoryManagerPodLevelResourcesError = "MemoryManagerPodLevelResourcesError" ) type Error interface { @@ -57,53 +49,14 @@ func GetPodAdmitResult(err error) lifecycle.PodAdmitResult { return lifecycle.PodAdmitResult{Admit: true} } - var errs []error - // To support multiple pod-level resource errors, we need to check if the error - // is an aggregate error. - var agg utilerrors.Aggregate - if errors.As(err, &agg) { - errs = agg.Errors() - } else { - errs = []error{err} - } - - var podLevelWarnings []error - var otherErrs []error - for _, e := range errs { - var admissionErr Error - if errors.As(e, &admissionErr) && (admissionErr.Type() == CPUManagerPodLevelResourcesError || admissionErr.Type() == MemoryManagerPodLevelResourcesError) { - podLevelWarnings = append(podLevelWarnings, e) - } else { - otherErrs = append(otherErrs, e) - } - } - - // If all errors are pod-level resource errors, we should treat them as warnings - // and not block pod admission. - if len(otherErrs) == 0 && len(podLevelWarnings) > 0 { - return lifecycle.PodAdmitResult{ - Admit: true, - Reason: PodLevelResourcesIncompatible, - Message: "", - Errors: podLevelWarnings, - } - } - - if len(otherErrs) == 0 { - // This should not happen if err != nil, but as a safeguard. - return lifecycle.PodAdmitResult{Admit: true} - } - - // At this point, we have at least one error that requires pod rejection. - firstErr := otherErrs[0] var admissionErr Error - if !errors.As(firstErr, &admissionErr) { - admissionErr = &unexpectedAdmissionError{firstErr} + if !errors.As(err, &admissionErr) { + admissionErr = &unexpectedAdmissionError{err} } return lifecycle.PodAdmitResult{ - Admit: false, Message: admissionErr.Error(), Reason: admissionErr.Type(), + Admit: false, } } diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 1d98eef8320..8b59ba67121 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -18,7 +18,6 @@ package cpumanager import ( "context" - "errors" "fmt" "math" "sync" @@ -262,13 +261,6 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error { // Call down into the policy to assign this container CPUs if required. err := m.policy.Allocate(m.state, p, c) - - // If it gets this error it means that the pod requires pod level resources but this is not aligned. - // We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning - if errors.As(err, &CPUManagerPodLevelResourcesError{}) { - return err - } - if err != nil { klog.ErrorS(err, "Allocate error") return err diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index fb787c35cc5..28591c5baf1 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -18,12 +18,10 @@ package cpumanager import ( "fmt" - "strconv" v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" - resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" @@ -37,15 +35,13 @@ import ( ) const ( + // PolicyStatic is the name of the static policy. // Should options be given, these will be ignored and backward (up to 1.21 included) // compatible behaviour will be enforced PolicyStatic policyName = "static" // ErrorSMTAlignment represents the type of an SMTAlignmentError ErrorSMTAlignment = "SMTAlignmentError" - - // ErrorCPUManagerPodLevelResources represents the type of a CPUManagerPodLevelResourcesError - ErrorCPUManagerPodLevelResources = "CPUManagerPodLevelResourcesError" ) // SMTAlignmentError represents an error due to SMT alignment @@ -56,15 +52,6 @@ type SMTAlignmentError struct { CausedByPhysicalCPUs bool } -// PodLevelResourcesError represents an error due to pod-level resources not being supported. -type CPUManagerPodLevelResourcesError struct{} - -func (e CPUManagerPodLevelResourcesError) Error() string { - return "CPU Manager static policy does not support pod-level resources" -} - -func (e CPUManagerPodLevelResourcesError) Type() string { return ErrorCPUManagerPodLevelResources } - func (e SMTAlignmentError) Error() string { if e.CausedByPhysicalCPUs { return fmt.Sprintf("SMT Alignment Error: not enough free physical CPUs: available physical CPUs = %d, requested CPUs = %d, CPUs per core = %d", e.AvailablePhysicalCPUs, e.RequestedCPUs, e.CpusPerCore) @@ -329,10 +316,6 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { numCPUs := p.guaranteedCPUs(pod, container) if numCPUs == 0 { - if p.isPodWithPodLevelResources(pod) { - return CPUManagerPodLevelResourcesError{} - } - // container belongs in the shared pool (nothing to do; use default cpuset) return nil } @@ -484,12 +467,6 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int klog.V(5).InfoS("Exclusive CPU allocation skipped, pod QoS is not guaranteed", "pod", klog.KObj(pod), "containerName", container.Name, "qos", qos) return 0 } - - // The CPU manager static policy does not support pod-level resources. - if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { - return 0 - } - cpuQuantity := container.Resources.Requests[v1.ResourceCPU] // In-place pod resize feature makes Container.Resources field mutable for CPU & memory. // AllocatedResources holds the value of Container.Resources.Requests when the pod was admitted. @@ -837,14 +814,3 @@ func updateAllocationPerNUMAMetric(topo *topology.CPUTopology, allocatedCPUs cpu metrics.CPUManagerAllocationPerNUMA.WithLabelValues(strconv.Itoa(numaNode)).Set(float64(count)) } } - -func (p *staticPolicy) isPodWithPodLevelResources(pod *v1.Pod) bool { - if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { - // The Memory manager static policy does not support pod-level resources. - klog.V(5).InfoS("CPU Manager allocation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod)) - - return true - } - - return false -} diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index 9d5d186f703..572f58599a0 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -18,7 +18,6 @@ package memorymanager import ( "context" - "errors" "fmt" "runtime" "sync" @@ -274,12 +273,6 @@ func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error { // Call down into the policy to assign this container memory if required. if err := m.policy.Allocate(ctx, m.state, pod, container); err != nil { - // If it gets this error it means that the pod requires pod level resources but this is not aligned. - // We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning - if errors.As(err, &MemoryManagerPodLevelResourcesError{}) { - return err - } - logger.Error(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name) return err } diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go index 7939d7920aa..ce85610b65d 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static.go +++ b/pkg/kubelet/cm/memorymanager/policy_static.go @@ -27,7 +27,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" utilfeature "k8s.io/apiserver/pkg/util/feature" - resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -39,12 +38,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" ) -const ( - PolicyTypeStatic policyType = "Static" - - // ErrorMemoryManagerPodLevelResources represents the type of a MemoryManagerPodLevelResourcesError - ErrorMemoryManagerPodLevelResources = "MemoryManagerPodLevelResourcesError" -) +const PolicyTypeStatic policyType = "Static" type systemReservedMemory map[int]map[v1.ResourceName]uint64 type reusableMemory map[string]map[string]map[v1.ResourceName]uint64 @@ -66,16 +60,6 @@ type staticPolicy struct { var _ Policy = &staticPolicy{} -type MemoryManagerPodLevelResourcesError struct{} - -func (e MemoryManagerPodLevelResourcesError) Type() string { - return ErrorMemoryManagerPodLevelResources -} - -func (e MemoryManagerPodLevelResourcesError) Error() string { - return "Memory Manager static policy does not support pod-level resources" -} - // NewPolicyStatic returns new static policy instance func NewPolicyStatic(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) { var totalSystemReserved uint64 @@ -123,10 +107,6 @@ func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, return nil } - if p.isPodWithPodLevelResources(ctx, pod) { - return MemoryManagerPodLevelResourcesError{} - } - podUID := string(pod.UID) logger.Info("Allocate") // container belongs in an exclusively allocated pool @@ -426,10 +406,6 @@ func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, p return nil } - if p.isPodWithPodLevelResources(ctx, pod) { - return nil - } - reqRsrcs, err := getPodRequestedResources(pod) if err != nil { logger.Error(err, "Failed to get pod requested resources", "podUID", pod.UID) @@ -460,10 +436,6 @@ func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod return nil } - if p.isPodWithPodLevelResources(ctx, pod) { - return nil - } - requestedResources, err := getRequestedResources(pod, container) if err != nil { logger.Error(err, "Failed to get container requested resources", "podUID", pod.UID, "containerName", container.Name) @@ -1104,16 +1076,3 @@ func isAffinityViolatingNUMAAllocations(machineState state.NUMANodeMap, mask bit } return false } - -func (p *staticPolicy) isPodWithPodLevelResources(ctx context.Context, pod *v1.Pod) bool { - logger := klog.FromContext(ctx) - - if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { - // The Memory manager static policy does not support pod-level resources. - logger.V(5).Info("Memory manager allocation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "pod", klog.KObj(pod)) - - return true - } - - return false -} diff --git a/pkg/kubelet/cm/topologymanager/scope.go b/pkg/kubelet/cm/topologymanager/scope.go index df69aad06fe..db3edd63e64 100644 --- a/pkg/kubelet/cm/topologymanager/scope.go +++ b/pkg/kubelet/cm/topologymanager/scope.go @@ -19,11 +19,8 @@ package topologymanager import ( "sync" - v1 "k8s.io/api/core/v1" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/api/core/v1" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -151,21 +148,11 @@ func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult { // It would be better to implement this function in topologymanager instead of scope // but topologymanager do not track providers anymore func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error { - isPodLevelResourcesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) - - var errs []error for _, provider := range s.hintProviders { err := provider.Allocate(pod, container) - if err != nil && isPodLevelResourcesEnabled { - errs = append(errs, err) - } else if err != nil { + if err != nil { return err } } - - if isPodLevelResourcesEnabled { - return utilerrors.NewAggregate(errs) - } - return nil } diff --git a/pkg/kubelet/lifecycle/interfaces.go b/pkg/kubelet/lifecycle/interfaces.go index b8b8dea8ca7..3be7adade54 100644 --- a/pkg/kubelet/lifecycle/interfaces.go +++ b/pkg/kubelet/lifecycle/interfaces.go @@ -35,8 +35,6 @@ type PodAdmitResult struct { Reason string // a brief message explaining why the pod could not be admitted. Message string - // all errors for why the pod could not be admitted. - Errors []error } // PodAdmitHandler is notified during pod admission. From 4ca47255a8b6dd98a20beb852a99ef2ea5e4171a Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 29 Jul 2025 16:29:29 +0200 Subject: [PATCH 5/6] node: disable resource managers when pod-level resources are enabled When pod-level resources are detected, the cpu and memory manages cannot engage because the feature is not yet compatible, one of the main reasons being the managers only work at container level. So, the managers has to detect if pod level resources are in use, and turn themselves to no-operation skipping resource allocation should that be the case. We add an intentional loud log to inform the user, because pods with pod-level resources landing on a node which cannot actuate the desired spec is likely to be undesirable. Signed-off-by: Francesco Romani --- pkg/kubelet/cm/cpumanager/policy_static.go | 16 ++++++++++++++++ pkg/kubelet/cm/memorymanager/policy_static.go | 16 ++++++++++++++++ .../cm/memorymanager/policy_static_test.go | 2 +- 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 28591c5baf1..6c767882a6a 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" @@ -320,6 +321,11 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + klog.V(2).InfoS("CPU Manager allocation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID) + return nil + } + klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) // container belongs in an exclusively allocated pool metrics.CPUManagerPinningRequestsTotal.Inc() @@ -557,6 +563,11 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + klog.V(3).InfoS("CPU Manager hint generation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID) + return nil + } + // Short circuit to regenerate the same hints if there are already // guaranteed CPUs allocated to the Container. This might happen after a // kubelet restart, for example. @@ -604,6 +615,11 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + klog.V(3).InfoS("CPU Manager pod hint generation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID) + return nil + } + assignedCPUs := cpuset.New() for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { requestedByContainer := p.guaranteedCPUs(pod, &container) diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go index ce85610b65d..c744ebf3999 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static.go +++ b/pkg/kubelet/cm/memorymanager/policy_static.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" utilfeature "k8s.io/apiserver/pkg/util/feature" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -108,6 +109,11 @@ func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, } podUID := string(pod.UID) + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + logger.V(2).Info("Allocation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", podUID) + return nil + } + logger.Info("Allocate") // container belongs in an exclusively allocated pool metrics.MemoryManagerPinningRequestTotal.Inc() @@ -412,6 +418,11 @@ func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, p return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + logger.V(3).Info("Topology hints generation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", pod.UID) + return nil + } + for _, ctn := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { containerBlocks := s.GetMemoryBlocks(string(pod.UID), ctn.Name) // Short circuit to regenerate the same hints if there are already @@ -442,6 +453,11 @@ func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + logger.V(3).Info("Topology hints generation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", pod.UID) + return nil + } + containerBlocks := s.GetMemoryBlocks(string(pod.UID), container.Name) // Short circuit to regenerate the same hints if there are already // memory allocated for the container. This might happen after a diff --git a/pkg/kubelet/cm/memorymanager/policy_static_test.go b/pkg/kubelet/cm/memorymanager/policy_static_test.go index be8ce520ae1..bb5ab463bbb 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static_test.go +++ b/pkg/kubelet/cm/memorymanager/policy_static_test.go @@ -2074,7 +2074,7 @@ func TestStaticPolicyAllocate(t *testing.T) { pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed), expectedTopologyHints: nil, topologyHint: &topologymanager.TopologyHint{}, - expectedError: fmt.Errorf("Memory Manager static policy does not support pod-level resources"), + expectedError: nil, podLevelResourcesEnabled: true, }, } From a3a767b37ec6e8e14d53f5f477df5519586d098f Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 29 Jul 2025 17:54:39 +0200 Subject: [PATCH 6/6] WIP: fix e2e tests Signed-off-by: Francesco Romani --- test/e2e_node/cpu_manager_test.go | 2 ++ test/e2e_node/cpumanager_test.go | 10 ++++++---- test/e2e_node/memory_manager_test.go | 4 ++++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index 892aa0ee7d8..d27c76c841c 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -294,6 +294,7 @@ type cpuManagerKubeletArguments struct { policyName string enableCPUManagerOptions bool disableCPUQuotaWithExclusiveCPUs bool + enablePodLevelResources bool reservedSystemCPUs cpuset.CPUSet options map[string]string } @@ -307,6 +308,7 @@ func configureCPUManagerInKubelet(oldCfg *kubeletconfig.KubeletConfiguration, ku newCfg.FeatureGates["CPUManagerPolicyBetaOptions"] = kubeletArguments.enableCPUManagerOptions newCfg.FeatureGates["CPUManagerPolicyAlphaOptions"] = kubeletArguments.enableCPUManagerOptions newCfg.FeatureGates["DisableCPUQuotaWithExclusiveCPUs"] = kubeletArguments.disableCPUQuotaWithExclusiveCPUs + newCfg.FeatureGates["PodLevelResources"] = kubeletArguments.enablePodLevelResources newCfg.CPUManagerPolicy = kubeletArguments.policyName newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second} diff --git a/test/e2e_node/cpumanager_test.go b/test/e2e_node/cpumanager_test.go index eed312cc922..e5069b9ce24 100644 --- a/test/e2e_node/cpumanager_test.go +++ b/test/e2e_node/cpumanager_test.go @@ -1989,8 +1989,9 @@ var _ = SIGDescribe("CPU Manager Incompatibility Pod Level Resources", ginkgo.Or ginkgo.When("running guaranteed pod level resources tests", ginkgo.Label("guaranteed pod level resources", "reserved-cpus"), func() { ginkgo.It("should let the container access all the online CPUs without a reserved CPUs set", func(ctx context.Context) { updateKubeletConfigIfNeeded(ctx, f, configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ - policyName: string(cpumanager.PolicyStatic), - reservedSystemCPUs: cpuset.CPUSet{}, + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: cpuset.CPUSet{}, + enablePodLevelResources: true, })) pod := makeCPUManagerPod("gu-pod-level-resources", []ctnAttribute{ @@ -2023,8 +2024,9 @@ var _ = SIGDescribe("CPU Manager Incompatibility Pod Level Resources", ginkgo.Or reservedCPUs = cpuset.New(0) updateKubeletConfigIfNeeded(ctx, f, configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ - policyName: string(cpumanager.PolicyStatic), - reservedSystemCPUs: reservedCPUs, // Not really needed for the tests but helps to make a more precise check + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: reservedCPUs, // Not really needed for the tests but helps to make a more precise check + enablePodLevelResources: true, })) pod := makeCPUManagerPod("gu-pod-level-resources", []ctnAttribute{ diff --git a/test/e2e_node/memory_manager_test.go b/test/e2e_node/memory_manager_test.go index 23867970d71..c28f5d4eb4a 100644 --- a/test/e2e_node/memory_manager_test.go +++ b/test/e2e_node/memory_manager_test.go @@ -778,6 +778,10 @@ var _ = SIGDescribe("Memory Manager Incompatibility Pod Level Resources", framew kubeParams := *defaultKubeParams kubeParams.policy = staticPolicy updateKubeletConfigWithMemoryManagerParams(initialConfig, &kubeParams) + if initialConfig.FeatureGates == nil { + initialConfig.FeatureGates = make(map[string]bool) + } + initialConfig.FeatureGates["PodLevelResources"] = true }) ginkgo.Context("", func() {