diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 6c3af2dc3f3..742dfc11ae9 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -172,6 +172,22 @@ func makePod(podUID, containerName, cpuRequest, cpuLimit string) *v1.Pod { return pod } +func makePodWithPodLevelResources(podUID, podCPURequest, podCPULimit, containerName, cpuRequest, cpuLimit string) *v1.Pod { + pod := makePod(podUID, containerName, cpuRequest, cpuLimit) + pod.Spec.Resources = &v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(podCPURequest), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(podCPULimit), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + } + + return pod +} + func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 28591c5baf1..6c767882a6a 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" @@ -320,6 +321,11 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + klog.V(2).InfoS("CPU Manager allocation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID) + return nil + } + klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name) // container belongs in an exclusively allocated pool metrics.CPUManagerPinningRequestsTotal.Inc() @@ -557,6 +563,11 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + klog.V(3).InfoS("CPU Manager hint generation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID) + return nil + } + // Short circuit to regenerate the same hints if there are already // guaranteed CPUs allocated to the Container. This might happen after a // kubelet restart, for example. @@ -604,6 +615,11 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + klog.V(3).InfoS("CPU Manager pod hint generation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID) + return nil + } + assignedCPUs := cpuset.New() for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { requestedByContainer := p.guaranteedCPUs(pod, &container) diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index f6c230bc32f..4773c779a67 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -26,7 +26,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" - pkgfeatures "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -35,12 +35,13 @@ import ( ) type testCase struct { - name string - pod v1.Pod - container v1.Container - assignments state.ContainerCPUAssignments - defaultCPUSet cpuset.CPUSet - expectedHints []topologymanager.TopologyHint + name string + pod v1.Pod + container v1.Container + assignments state.ContainerCPUAssignments + defaultCPUSet cpuset.CPUSet + expectedHints []topologymanager.TopologyHint + podLevelResourcesEnabled bool } func returnMachineInfo() cadvisorapi.MachineInfo { @@ -210,9 +211,10 @@ func TestPodGuaranteedCPUs(t *testing.T) { func TestGetTopologyHints(t *testing.T) { machineInfo := returnMachineInfo() - tcases := returnTestCases() - for _, tc := range tcases { + for _, tc := range returnTestCases() { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, tc.podLevelResourcesEnabled) + topology, _ := topology.Discover(&machineInfo) var activePods []*v1.Pod @@ -261,6 +263,8 @@ func TestGetPodTopologyHints(t *testing.T) { machineInfo := returnMachineInfo() for _, tc := range returnTestCases() { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, tc.podLevelResourcesEnabled) + topology, _ := topology.Discover(&machineInfo) var activePods []*v1.Pod @@ -442,7 +446,7 @@ func TestGetPodTopologyHintsWithPolicyOptions(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CPUManagerPolicyAlphaOptions, true) var activePods []*v1.Pod for p := range testCase.assignments { @@ -495,6 +499,9 @@ func returnTestCases() []testCase { testPod4 := makePod("fakePod", "fakeContainer", "11", "11") testContainer4 := &testPod4.Spec.Containers[0] + testPod5 := makePodWithPodLevelResources("fakePod", "5", "5", "fakeContainer", "4", "4") + testContainer5 := &testPod5.Spec.Containers[0] + firstSocketMask, _ := bitmask.NewBitMask(0) secondSocketMask, _ := bitmask.NewBitMask(1) crossSocketMask, _ := bitmask.NewBitMask(0, 1) @@ -657,5 +664,13 @@ func returnTestCases() []testCase { defaultCPUSet: cpuset.New(), expectedHints: []topologymanager.TopologyHint{}, }, + { + name: "Pod has pod level resources, no hint generation", + pod: *testPod5, + container: *testContainer5, + defaultCPUSet: cpuset.New(0, 1, 2, 3), + expectedHints: nil, + podLevelResourcesEnabled: true, + }, } } diff --git a/pkg/kubelet/cm/memorymanager/memory_manager_test.go b/pkg/kubelet/cm/memorymanager/memory_manager_test.go index 8bcfa3fd3d7..fa5a9aa4479 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager_test.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager_test.go @@ -151,6 +151,13 @@ func getPod(podUID string, containerName string, requirements *v1.ResourceRequir } } +func getPodWithPodLevelResources(podUID string, podRequirements *v1.ResourceRequirements, containerName string, containerRequirements *v1.ResourceRequirements) *v1.Pod { + pod := getPod(podUID, containerName, containerRequirements) + pod.Spec.Resources = podRequirements + + return pod +} + func getPodWithInitContainers(podUID string, containers []v1.Container, initContainers []v1.Container) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go index ce85610b65d..c744ebf3999 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static.go +++ b/pkg/kubelet/cm/memorymanager/policy_static.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" utilfeature "k8s.io/apiserver/pkg/util/feature" + resourcehelper "k8s.io/component-helpers/resource" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -108,6 +109,11 @@ func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, } podUID := string(pod.UID) + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + logger.V(2).Info("Allocation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", podUID) + return nil + } + logger.Info("Allocate") // container belongs in an exclusively allocated pool metrics.MemoryManagerPinningRequestTotal.Inc() @@ -412,6 +418,11 @@ func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, p return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + logger.V(3).Info("Topology hints generation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", pod.UID) + return nil + } + for _, ctn := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { containerBlocks := s.GetMemoryBlocks(string(pod.UID), ctn.Name) // Short circuit to regenerate the same hints if there are already @@ -442,6 +453,11 @@ func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod return nil } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) { + logger.V(3).Info("Topology hints generation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", pod.UID) + return nil + } + containerBlocks := s.GetMemoryBlocks(string(pod.UID), container.Name) // Short circuit to regenerate the same hints if there are already // memory allocated for the container. This might happen after a diff --git a/pkg/kubelet/cm/memorymanager/policy_static_test.go b/pkg/kubelet/cm/memorymanager/policy_static_test.go index cf85d75ddb8..bb5ab463bbb 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static_test.go +++ b/pkg/kubelet/cm/memorymanager/policy_static_test.go @@ -26,6 +26,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" @@ -42,6 +45,17 @@ const ( var ( containerRestartPolicyAlways = v1.ContainerRestartPolicyAlways + podLevelRequirementsGuaranteed = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000Mi"), + v1.ResourceMemory: resource.MustParse("1Gi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000Mi"), + v1.ResourceMemory: resource.MustParse("1Gi"), + }, + } + requirementsGuaranteed = &v1.ResourceRequirements{ Limits: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("1000Mi"), @@ -131,6 +145,7 @@ type testStaticPolicy struct { topologyHint *topologymanager.TopologyHint expectedTopologyHints map[string][]topologymanager.TopologyHint initContainersReusableMemory reusableMemory + podLevelResourcesEnabled bool } func initTests(t *testing.T, testCase *testStaticPolicy, hint *topologymanager.TopologyHint, initContainersReusableMemory reusableMemory) (Policy, state.State, error) { @@ -2006,10 +2021,68 @@ func TestStaticPolicyAllocate(t *testing.T) { topologyHint: &topologymanager.TopologyHint{NUMANodeAffinity: newNUMAAffinity(0, 1), Preferred: true}, expectedError: fmt.Errorf("[memorymanager] preferred hint violates NUMA node allocation"), }, + { + description: "should do nothing for guaranteed pod with pod level resources", + expectedAssignments: state.ContainerMemoryAssignments{}, + machineState: state.NUMANodeMap{ + 0: &state.NUMANodeState{ + MemoryMap: map[v1.ResourceName]*state.MemoryTable{ + v1.ResourceMemory: { + Allocatable: 1536 * mb, + Free: 1536 * mb, + Reserved: 0, + SystemReserved: 512 * mb, + TotalMemSize: 2 * gb, + }, + hugepages1Gi: { + Allocatable: gb, + Free: gb, + Reserved: 0, + SystemReserved: 0, + TotalMemSize: gb, + }, + }, + Cells: []int{}, + }, + }, + expectedMachineState: state.NUMANodeMap{ + 0: &state.NUMANodeState{ + MemoryMap: map[v1.ResourceName]*state.MemoryTable{ + v1.ResourceMemory: { + Allocatable: 1536 * mb, + Free: 1536 * mb, + Reserved: 0, + SystemReserved: 512 * mb, + TotalMemSize: 2 * gb, + }, + hugepages1Gi: { + Allocatable: gb, + Free: gb, + Reserved: 0, + SystemReserved: 0, + TotalMemSize: gb, + }, + }, + Cells: []int{}, + }, + }, + systemReserved: systemReservedMemory{ + 0: map[v1.ResourceName]uint64{ + v1.ResourceMemory: 512 * mb, + }, + }, + pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed), + expectedTopologyHints: nil, + topologyHint: &topologymanager.TopologyHint{}, + expectedError: nil, + podLevelResourcesEnabled: true, + }, } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled) + t.Logf("TestStaticPolicyAllocate %s", testCase.description) p, s, err := initTests(t, &testCase, testCase.topologyHint, nil) if err != nil { @@ -2017,7 +2090,7 @@ func TestStaticPolicyAllocate(t *testing.T) { } err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0]) - if !reflect.DeepEqual(err, testCase.expectedError) { + if (err == nil) != (testCase.expectedError == nil) || (err != nil && testCase.expectedError != nil && err.Error() != testCase.expectedError.Error()) { t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) } @@ -3732,10 +3805,23 @@ func TestStaticPolicyGetTopologyHints(t *testing.T) { }, }, }, + { + description: "should not provide topology hints for guaranteed pod with pod level resources", + pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed), + systemReserved: systemReservedMemory{ + 0: map[v1.ResourceName]uint64{ + v1.ResourceMemory: 1024 * mb, + }, + }, + expectedTopologyHints: nil, + podLevelResourcesEnabled: true, + }, } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled) + p, s, err := initTests(t, &testCase, nil, nil) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3749,6 +3835,39 @@ func TestStaticPolicyGetTopologyHints(t *testing.T) { } } +func TestStaticPolicyGetPodTopologyHints(t *testing.T) { + _, tCtx := ktesting.NewTestContext(t) + testCases := []testStaticPolicy{ + { + description: "should not provide pod topology hints for guaranteed pod with pod level resources", + pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed), + systemReserved: systemReservedMemory{ + 0: map[v1.ResourceName]uint64{ + v1.ResourceMemory: 1024 * mb, + }, + }, + expectedTopologyHints: nil, + podLevelResourcesEnabled: true, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled) + + p, s, err := initTests(t, &testCase, nil, nil) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + topologyHints := p.GetPodTopologyHints(tCtx, s, testCase.pod) + if !reflect.DeepEqual(topologyHints, testCase.expectedTopologyHints) { + t.Fatalf("The actual topology hints: '%+v' are different from the expected one: '%+v'", topologyHints, testCase.expectedTopologyHints) + } + }) + } +} + func Test_getPodRequestedResources(t *testing.T) { testCases := []struct { description string diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index 892aa0ee7d8..d27c76c841c 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -294,6 +294,7 @@ type cpuManagerKubeletArguments struct { policyName string enableCPUManagerOptions bool disableCPUQuotaWithExclusiveCPUs bool + enablePodLevelResources bool reservedSystemCPUs cpuset.CPUSet options map[string]string } @@ -307,6 +308,7 @@ func configureCPUManagerInKubelet(oldCfg *kubeletconfig.KubeletConfiguration, ku newCfg.FeatureGates["CPUManagerPolicyBetaOptions"] = kubeletArguments.enableCPUManagerOptions newCfg.FeatureGates["CPUManagerPolicyAlphaOptions"] = kubeletArguments.enableCPUManagerOptions newCfg.FeatureGates["DisableCPUQuotaWithExclusiveCPUs"] = kubeletArguments.disableCPUQuotaWithExclusiveCPUs + newCfg.FeatureGates["PodLevelResources"] = kubeletArguments.enablePodLevelResources newCfg.CPUManagerPolicy = kubeletArguments.policyName newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second} diff --git a/test/e2e_node/cpumanager_test.go b/test/e2e_node/cpumanager_test.go index 4d248f8ff69..e5069b9ce24 100644 --- a/test/e2e_node/cpumanager_test.go +++ b/test/e2e_node/cpumanager_test.go @@ -39,9 +39,11 @@ import ( "github.com/onsi/gomega/types" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" @@ -1921,6 +1923,140 @@ var _ = SIGDescribe("CPU Manager", ginkgo.Ordered, ginkgo.ContinueOnFailure, fra }) }) +var _ = SIGDescribe("CPU Manager Incompatibility Pod Level Resources", ginkgo.Ordered, ginkgo.ContinueOnFailure, framework.WithSerial(), feature.CPUManager, feature.PodLevelResources, framework.WithFeatureGate(features.PodLevelResources), func() { + f := framework.NewDefaultFramework("cpu-manager-incompatibility-pod-level-resources-test") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + // original kubeletconfig before the context start, to be restored + var oldCfg *kubeletconfig.KubeletConfiguration + var reservedCPUs cpuset.CPUSet + var onlineCPUs cpuset.CPUSet + var smtLevel int + var uncoreGroupSize int + // tracks all the pods created by a It() block. Best would be a namespace per It block + var podMap map[string]*v1.Pod + + ginkgo.BeforeAll(func(ctx context.Context) { + var err error + oldCfg, err = getCurrentKubeletConfig(ctx) + framework.ExpectNoError(err) + + onlineCPUs, err = getOnlineCPUs() // this should not change at all, at least during this suite lifetime + framework.ExpectNoError(err) + framework.Logf("Online CPUs: %s", onlineCPUs) + + smtLevel = smtLevelFromSysFS() // this should not change at all, at least during this suite lifetime + framework.Logf("SMT level: %d", smtLevel) + + uncoreGroupSize = getUncoreCPUGroupSize() + framework.Logf("Uncore Group Size: %d", uncoreGroupSize) + + e2enodeCgroupV2Enabled = IsCgroup2UnifiedMode() + framework.Logf("cgroup V2 enabled: %v", e2enodeCgroupV2Enabled) + + e2enodeCgroupDriver = oldCfg.CgroupDriver + framework.Logf("cgroup driver: %s", e2enodeCgroupDriver) + + runtime, _, err := getCRIClient() + framework.ExpectNoError(err, "Failed to get CRI client") + + version, err := runtime.Version(context.Background(), "") + framework.ExpectNoError(err, "Failed to get runtime version") + + e2enodeRuntimeName = version.GetRuntimeName() + framework.Logf("runtime: %s", e2enodeRuntimeName) + }) + + ginkgo.AfterAll(func(ctx context.Context) { + updateKubeletConfig(ctx, f, oldCfg, true) + }) + + ginkgo.BeforeEach(func(ctx context.Context) { + // note intentionally NOT set reservedCPUs - this must be initialized on a test-by-test basis + podMap = make(map[string]*v1.Pod) + }) + + ginkgo.JustBeforeEach(func(ctx context.Context) { + if !e2enodeCgroupV2Enabled { + e2eskipper.Skipf("Skipping since CgroupV2 not used") + } + }) + + ginkgo.AfterEach(func(ctx context.Context) { + deletePodsAsync(ctx, f, podMap) + }) + + ginkgo.When("running guaranteed pod level resources tests", ginkgo.Label("guaranteed pod level resources", "reserved-cpus"), func() { + ginkgo.It("should let the container access all the online CPUs without a reserved CPUs set", func(ctx context.Context) { + updateKubeletConfigIfNeeded(ctx, f, configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: cpuset.CPUSet{}, + enablePodLevelResources: true, + })) + + pod := makeCPUManagerPod("gu-pod-level-resources", []ctnAttribute{ + { + ctnName: "gu-container", + cpuRequest: "1", + cpuLimit: "1", + }, + }) + pod.Spec.Resources = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + } + + ginkgo.By("creating the test pod") + pod = e2epod.NewPodClient(f).CreateSync(ctx, pod) + podMap[string(pod.UID)] = pod + + ginkgo.By("checking if the expected cpuset was assigned") + gomega.Expect(pod).To(HaveContainerCPUsEqualTo("gu-container", onlineCPUs)) + }) + + ginkgo.It("should let the container access all the online CPUs when using a reserved CPUs set", func(ctx context.Context) { + reservedCPUs = cpuset.New(0) + + updateKubeletConfigIfNeeded(ctx, f, configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{ + policyName: string(cpumanager.PolicyStatic), + reservedSystemCPUs: reservedCPUs, // Not really needed for the tests but helps to make a more precise check + enablePodLevelResources: true, + })) + + pod := makeCPUManagerPod("gu-pod-level-resources", []ctnAttribute{ + { + ctnName: "gu-container", + cpuRequest: "1", + cpuLimit: "1", + }, + }) + pod.Spec.Resources = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + } + + ginkgo.By("creating the test pod") + pod = e2epod.NewPodClient(f).CreateSync(ctx, pod) + podMap[string(pod.UID)] = pod + + ginkgo.By("checking if the expected cpuset was assigned") + gomega.Expect(pod).To(HaveContainerCPUsEqualTo("gu-container", onlineCPUs)) + }) + }) +}) + // Matching helpers func HaveStatusReasonMatchingRegex(expr string) types.GomegaMatcher { diff --git a/test/e2e_node/memory_manager_test.go b/test/e2e_node/memory_manager_test.go index de371c33231..c28f5d4eb4a 100644 --- a/test/e2e_node/memory_manager_test.go +++ b/test/e2e_node/memory_manager_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" + "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" @@ -238,6 +239,18 @@ func getAllNUMANodes() []int { return numaNodes } +func verifyMemoryPinning(f *framework.Framework, ctx context.Context, pod *v1.Pod, numaNodeIDs []int) { + ginkgo.By("Verifying the NUMA pinning") + + output, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name) + framework.ExpectNoError(err) + + currentNUMANodeIDs, err := cpuset.Parse(strings.Trim(output, "\n")) + framework.ExpectNoError(err) + + gomega.Expect(numaNodeIDs).To(gomega.Equal(currentNUMANodeIDs.List())) +} + // Serial because the test updates kubelet configuration. var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), framework.WithSerial(), feature.MemoryManager, func() { // TODO: add more complex tests that will include interaction between CPUManager, MemoryManager and TopologyManager @@ -267,18 +280,6 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), evictionHard: map[string]string{evictionHardMemory: "100Mi"}, } - verifyMemoryPinning := func(ctx context.Context, pod *v1.Pod, numaNodeIDs []int) { - ginkgo.By("Verifying the NUMA pinning") - - output, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name) - framework.ExpectNoError(err) - - currentNUMANodeIDs, err := cpuset.Parse(strings.Trim(output, "\n")) - framework.ExpectNoError(err) - - gomega.Expect(numaNodeIDs).To(gomega.Equal(currentNUMANodeIDs.List())) - } - waitingForHugepages := func(ctx context.Context, hugepagesCount int) { gomega.Eventually(ctx, func(ctx context.Context) error { node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, framework.TestContext.NodeName, metav1.GetOptions{}) @@ -380,7 +381,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - defer conn.Close() + defer conn.Close() //nolint:errcheck resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) framework.ExpectNoError(err) @@ -444,7 +445,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), return } - verifyMemoryPinning(ctx, testPod, []int{0}) + verifyMemoryPinning(f, ctx, testPod, []int{0}) }) }) @@ -469,7 +470,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), return } - verifyMemoryPinning(ctx, testPod, []int{0}) + verifyMemoryPinning(f, ctx, testPod, []int{0}) }) }) @@ -503,8 +504,8 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), return } - verifyMemoryPinning(ctx, testPod, []int{0}) - verifyMemoryPinning(ctx, testPod2, []int{0}) + verifyMemoryPinning(f, ctx, testPod, []int{0}) + verifyMemoryPinning(f, ctx, testPod2, []int{0}) }) // TODO: move the test to pod resource API test suite, see - https://github.com/kubernetes/kubernetes/issues/101945 @@ -520,7 +521,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - defer conn.Close() + defer conn.Close() //nolint:errcheck resp, err := cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) framework.ExpectNoError(err) @@ -665,7 +666,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - defer conn.Close() + defer conn.Close() //nolint:errcheck resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{}) framework.ExpectNoError(err) @@ -682,7 +683,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) framework.ExpectNoError(err) - defer conn.Close() + defer conn.Close() //nolint:errcheck resp, err := cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) framework.ExpectNoError(err) @@ -706,7 +707,139 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), return } - verifyMemoryPinning(ctx, testPod, allNUMANodes) + verifyMemoryPinning(f, ctx, testPod, allNUMANodes) + }) + }) + }) +}) + +var _ = SIGDescribe("Memory Manager Incompatibility Pod Level Resources", framework.WithDisruptive(), framework.WithSerial(), feature.MemoryManager, feature.PodLevelResources, framework.WithFeatureGate(features.PodLevelResources), func() { + var ( + allNUMANodes []int + ctnParams, initCtnParams []memoryManagerCtnAttributes + isMultiNUMASupported *bool + testPod *v1.Pod + ) + + f := framework.NewDefaultFramework("memory-manager-incompatibility-pod-level-resources-test") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + memoryQuantity := resource.MustParse("1100Mi") + defaultKubeParams := &memoryManagerKubeletParams{ + systemReservedMemory: []kubeletconfig.MemoryReservation{ + { + NumaNode: 0, + Limits: v1.ResourceList{ + resourceMemory: memoryQuantity, + }, + }, + }, + systemReserved: map[string]string{resourceMemory: "500Mi"}, + kubeReserved: map[string]string{resourceMemory: "500Mi"}, + evictionHard: map[string]string{evictionHardMemory: "100Mi"}, + } + + ginkgo.BeforeEach(func(ctx context.Context) { + if isMultiNUMASupported == nil { + isMultiNUMASupported = ptr.To(isMultiNUMA()) + } + + if len(allNUMANodes) == 0 { + allNUMANodes = getAllNUMANodes() + } + }) + + // dynamically update the kubelet configuration + ginkgo.JustBeforeEach(func(ctx context.Context) { + if len(ctnParams) > 0 { + testPod = makeMemoryManagerPod(ctnParams[0].ctnName, initCtnParams, ctnParams) + testPod.Spec.Resources = &v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("128Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("128Mi"), + }, + } + } + }) + + ginkgo.JustAfterEach(func(ctx context.Context) { + // delete the test pod + if testPod != nil && testPod.Name != "" { + e2epod.NewPodClient(f).DeleteSync(ctx, testPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete) + } + }) + + ginkgo.Context("with static policy", func() { + tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) { + kubeParams := *defaultKubeParams + kubeParams.policy = staticPolicy + updateKubeletConfigWithMemoryManagerParams(initialConfig, &kubeParams) + if initialConfig.FeatureGates == nil { + initialConfig.FeatureGates = make(map[string]bool) + } + initialConfig.FeatureGates["PodLevelResources"] = true + }) + + ginkgo.Context("", func() { + ginkgo.BeforeEach(func() { + // override pod parameters + ctnParams = []memoryManagerCtnAttributes{ + { + ctnName: "memory-manager-none", + cpus: "100m", + memory: "128Mi", + }, + } + }) + + ginkgo.JustAfterEach(func() { + // reset containers attributes + ctnParams = []memoryManagerCtnAttributes{} + initCtnParams = []memoryManagerCtnAttributes{} + }) + + ginkgo.It("should not report any memory data during request to pod resources List when pod has pod level resources", func(ctx context.Context) { + testPod = e2epod.NewPodClient(f).CreateSync(ctx, testPod) + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + var resp *kubeletpodresourcesv1.ListPodResourcesResponse + gomega.Eventually(ctx, func(ctx context.Context) error { + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + if err != nil { + return err + } + defer conn.Close() //nolint:errcheck + resp, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{}) + + return err + }, time.Minute, 5*time.Second).Should(gomega.Succeed()) + + for _, podResource := range resp.PodResources { + if podResource.Name != testPod.Name { + continue + } + + for _, containerResource := range podResource.Containers { + gomega.Expect(containerResource.Memory).To(gomega.BeEmpty()) + } + } + }) + + ginkgo.It("should succeed to start the pod when it has pod level resources", func(ctx context.Context) { + testPod = e2epod.NewPodClient(f).CreateSync(ctx, testPod) + + // it no taste to verify NUMA pinning when the node has only one NUMA node + if !*isMultiNUMASupported { + return + } + + verifyMemoryPinning(f, ctx, testPod, allNUMANodes) }) }) })