Merge pull request #133279 from ffromani/pod-level-resource-managers

[PodLevelResources] handle pod-level resource manager alignment
This commit is contained in:
Kubernetes Prow Robot
2025-07-29 17:28:33 -07:00
committed by GitHub
9 changed files with 492 additions and 32 deletions

View File

@@ -172,6 +172,22 @@ func makePod(podUID, containerName, cpuRequest, cpuLimit string) *v1.Pod {
return pod
}
func makePodWithPodLevelResources(podUID, podCPURequest, podCPULimit, containerName, cpuRequest, cpuLimit string) *v1.Pod {
pod := makePod(podUID, containerName, cpuRequest, cpuLimit)
pod.Spec.Resources = &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(podCPURequest),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse(podCPULimit),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"),
},
}
return pod
}
func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{

View File

@@ -22,6 +22,7 @@ import (
v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
resourcehelper "k8s.io/component-helpers/resource"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
@@ -320,6 +321,11 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
return nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
klog.V(2).InfoS("CPU Manager allocation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
// container belongs in an exclusively allocated pool
metrics.CPUManagerPinningRequestsTotal.Inc()
@@ -557,6 +563,11 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
return nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
klog.V(3).InfoS("CPU Manager hint generation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
// Short circuit to regenerate the same hints if there are already
// guaranteed CPUs allocated to the Container. This might happen after a
// kubelet restart, for example.
@@ -604,6 +615,11 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
return nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
klog.V(3).InfoS("CPU Manager pod hint generation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
assignedCPUs := cpuset.New()
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
requestedByContainer := p.guaranteedCPUs(pod, &container)

View File

@@ -26,7 +26,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@@ -35,12 +35,13 @@ import (
)
type testCase struct {
name string
pod v1.Pod
container v1.Container
assignments state.ContainerCPUAssignments
defaultCPUSet cpuset.CPUSet
expectedHints []topologymanager.TopologyHint
name string
pod v1.Pod
container v1.Container
assignments state.ContainerCPUAssignments
defaultCPUSet cpuset.CPUSet
expectedHints []topologymanager.TopologyHint
podLevelResourcesEnabled bool
}
func returnMachineInfo() cadvisorapi.MachineInfo {
@@ -210,9 +211,10 @@ func TestPodGuaranteedCPUs(t *testing.T) {
func TestGetTopologyHints(t *testing.T) {
machineInfo := returnMachineInfo()
tcases := returnTestCases()
for _, tc := range tcases {
for _, tc := range returnTestCases() {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, tc.podLevelResourcesEnabled)
topology, _ := topology.Discover(&machineInfo)
var activePods []*v1.Pod
@@ -261,6 +263,8 @@ func TestGetPodTopologyHints(t *testing.T) {
machineInfo := returnMachineInfo()
for _, tc := range returnTestCases() {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, tc.podLevelResourcesEnabled)
topology, _ := topology.Discover(&machineInfo)
var activePods []*v1.Pod
@@ -442,7 +446,7 @@ func TestGetPodTopologyHintsWithPolicyOptions(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CPUManagerPolicyAlphaOptions, true)
var activePods []*v1.Pod
for p := range testCase.assignments {
@@ -495,6 +499,9 @@ func returnTestCases() []testCase {
testPod4 := makePod("fakePod", "fakeContainer", "11", "11")
testContainer4 := &testPod4.Spec.Containers[0]
testPod5 := makePodWithPodLevelResources("fakePod", "5", "5", "fakeContainer", "4", "4")
testContainer5 := &testPod5.Spec.Containers[0]
firstSocketMask, _ := bitmask.NewBitMask(0)
secondSocketMask, _ := bitmask.NewBitMask(1)
crossSocketMask, _ := bitmask.NewBitMask(0, 1)
@@ -657,5 +664,13 @@ func returnTestCases() []testCase {
defaultCPUSet: cpuset.New(),
expectedHints: []topologymanager.TopologyHint{},
},
{
name: "Pod has pod level resources, no hint generation",
pod: *testPod5,
container: *testContainer5,
defaultCPUSet: cpuset.New(0, 1, 2, 3),
expectedHints: nil,
podLevelResourcesEnabled: true,
},
}
}

View File

@@ -151,6 +151,13 @@ func getPod(podUID string, containerName string, requirements *v1.ResourceRequir
}
}
func getPodWithPodLevelResources(podUID string, podRequirements *v1.ResourceRequirements, containerName string, containerRequirements *v1.ResourceRequirements) *v1.Pod {
pod := getPod(podUID, containerName, containerRequirements)
pod.Spec.Resources = podRequirements
return pod
}
func getPodWithInitContainers(podUID string, containers []v1.Container, initContainers []v1.Container) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{

View File

@@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
resourcehelper "k8s.io/component-helpers/resource"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@@ -108,6 +109,11 @@ func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod,
}
podUID := string(pod.UID)
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
logger.V(2).Info("Allocation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", podUID)
return nil
}
logger.Info("Allocate")
// container belongs in an exclusively allocated pool
metrics.MemoryManagerPinningRequestTotal.Inc()
@@ -412,6 +418,11 @@ func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, p
return nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
logger.V(3).Info("Topology hints generation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", pod.UID)
return nil
}
for _, ctn := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
containerBlocks := s.GetMemoryBlocks(string(pod.UID), ctn.Name)
// Short circuit to regenerate the same hints if there are already
@@ -442,6 +453,11 @@ func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod
return nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
logger.V(3).Info("Topology hints generation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "podUID", pod.UID)
return nil
}
containerBlocks := s.GetMemoryBlocks(string(pod.UID), container.Name)
// Short circuit to regenerate the same hints if there are already
// memory allocated for the container. This might happen after a

View File

@@ -26,6 +26,9 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
@@ -42,6 +45,17 @@ const (
var (
containerRestartPolicyAlways = v1.ContainerRestartPolicyAlways
podLevelRequirementsGuaranteed = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000Mi"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000Mi"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
}
requirementsGuaranteed = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000Mi"),
@@ -131,6 +145,7 @@ type testStaticPolicy struct {
topologyHint *topologymanager.TopologyHint
expectedTopologyHints map[string][]topologymanager.TopologyHint
initContainersReusableMemory reusableMemory
podLevelResourcesEnabled bool
}
func initTests(t *testing.T, testCase *testStaticPolicy, hint *topologymanager.TopologyHint, initContainersReusableMemory reusableMemory) (Policy, state.State, error) {
@@ -2006,10 +2021,68 @@ func TestStaticPolicyAllocate(t *testing.T) {
topologyHint: &topologymanager.TopologyHint{NUMANodeAffinity: newNUMAAffinity(0, 1), Preferred: true},
expectedError: fmt.Errorf("[memorymanager] preferred hint violates NUMA node allocation"),
},
{
description: "should do nothing for guaranteed pod with pod level resources",
expectedAssignments: state.ContainerMemoryAssignments{},
machineState: state.NUMANodeMap{
0: &state.NUMANodeState{
MemoryMap: map[v1.ResourceName]*state.MemoryTable{
v1.ResourceMemory: {
Allocatable: 1536 * mb,
Free: 1536 * mb,
Reserved: 0,
SystemReserved: 512 * mb,
TotalMemSize: 2 * gb,
},
hugepages1Gi: {
Allocatable: gb,
Free: gb,
Reserved: 0,
SystemReserved: 0,
TotalMemSize: gb,
},
},
Cells: []int{},
},
},
expectedMachineState: state.NUMANodeMap{
0: &state.NUMANodeState{
MemoryMap: map[v1.ResourceName]*state.MemoryTable{
v1.ResourceMemory: {
Allocatable: 1536 * mb,
Free: 1536 * mb,
Reserved: 0,
SystemReserved: 512 * mb,
TotalMemSize: 2 * gb,
},
hugepages1Gi: {
Allocatable: gb,
Free: gb,
Reserved: 0,
SystemReserved: 0,
TotalMemSize: gb,
},
},
Cells: []int{},
},
},
systemReserved: systemReservedMemory{
0: map[v1.ResourceName]uint64{
v1.ResourceMemory: 512 * mb,
},
},
pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed),
expectedTopologyHints: nil,
topologyHint: &topologymanager.TopologyHint{},
expectedError: nil,
podLevelResourcesEnabled: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled)
t.Logf("TestStaticPolicyAllocate %s", testCase.description)
p, s, err := initTests(t, &testCase, testCase.topologyHint, nil)
if err != nil {
@@ -2017,7 +2090,7 @@ func TestStaticPolicyAllocate(t *testing.T) {
}
err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0])
if !reflect.DeepEqual(err, testCase.expectedError) {
if (err == nil) != (testCase.expectedError == nil) || (err != nil && testCase.expectedError != nil && err.Error() != testCase.expectedError.Error()) {
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
}
@@ -3732,10 +3805,23 @@ func TestStaticPolicyGetTopologyHints(t *testing.T) {
},
},
},
{
description: "should not provide topology hints for guaranteed pod with pod level resources",
pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed),
systemReserved: systemReservedMemory{
0: map[v1.ResourceName]uint64{
v1.ResourceMemory: 1024 * mb,
},
},
expectedTopologyHints: nil,
podLevelResourcesEnabled: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled)
p, s, err := initTests(t, &testCase, nil, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -3749,6 +3835,39 @@ func TestStaticPolicyGetTopologyHints(t *testing.T) {
}
}
func TestStaticPolicyGetPodTopologyHints(t *testing.T) {
_, tCtx := ktesting.NewTestContext(t)
testCases := []testStaticPolicy{
{
description: "should not provide pod topology hints for guaranteed pod with pod level resources",
pod: getPodWithPodLevelResources("pod1", podLevelRequirementsGuaranteed, "container1", requirementsGuaranteed),
systemReserved: systemReservedMemory{
0: map[v1.ResourceName]uint64{
v1.ResourceMemory: 1024 * mb,
},
},
expectedTopologyHints: nil,
podLevelResourcesEnabled: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLevelResources, testCase.podLevelResourcesEnabled)
p, s, err := initTests(t, &testCase, nil, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
topologyHints := p.GetPodTopologyHints(tCtx, s, testCase.pod)
if !reflect.DeepEqual(topologyHints, testCase.expectedTopologyHints) {
t.Fatalf("The actual topology hints: '%+v' are different from the expected one: '%+v'", topologyHints, testCase.expectedTopologyHints)
}
})
}
}
func Test_getPodRequestedResources(t *testing.T) {
testCases := []struct {
description string

View File

@@ -294,6 +294,7 @@ type cpuManagerKubeletArguments struct {
policyName string
enableCPUManagerOptions bool
disableCPUQuotaWithExclusiveCPUs bool
enablePodLevelResources bool
reservedSystemCPUs cpuset.CPUSet
options map[string]string
}
@@ -307,6 +308,7 @@ func configureCPUManagerInKubelet(oldCfg *kubeletconfig.KubeletConfiguration, ku
newCfg.FeatureGates["CPUManagerPolicyBetaOptions"] = kubeletArguments.enableCPUManagerOptions
newCfg.FeatureGates["CPUManagerPolicyAlphaOptions"] = kubeletArguments.enableCPUManagerOptions
newCfg.FeatureGates["DisableCPUQuotaWithExclusiveCPUs"] = kubeletArguments.disableCPUQuotaWithExclusiveCPUs
newCfg.FeatureGates["PodLevelResources"] = kubeletArguments.enablePodLevelResources
newCfg.CPUManagerPolicy = kubeletArguments.policyName
newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}

View File

@@ -39,9 +39,11 @@ import (
"github.com/onsi/gomega/types"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
@@ -1921,6 +1923,140 @@ var _ = SIGDescribe("CPU Manager", ginkgo.Ordered, ginkgo.ContinueOnFailure, fra
})
})
var _ = SIGDescribe("CPU Manager Incompatibility Pod Level Resources", ginkgo.Ordered, ginkgo.ContinueOnFailure, framework.WithSerial(), feature.CPUManager, feature.PodLevelResources, framework.WithFeatureGate(features.PodLevelResources), func() {
f := framework.NewDefaultFramework("cpu-manager-incompatibility-pod-level-resources-test")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
// original kubeletconfig before the context start, to be restored
var oldCfg *kubeletconfig.KubeletConfiguration
var reservedCPUs cpuset.CPUSet
var onlineCPUs cpuset.CPUSet
var smtLevel int
var uncoreGroupSize int
// tracks all the pods created by a It() block. Best would be a namespace per It block
var podMap map[string]*v1.Pod
ginkgo.BeforeAll(func(ctx context.Context) {
var err error
oldCfg, err = getCurrentKubeletConfig(ctx)
framework.ExpectNoError(err)
onlineCPUs, err = getOnlineCPUs() // this should not change at all, at least during this suite lifetime
framework.ExpectNoError(err)
framework.Logf("Online CPUs: %s", onlineCPUs)
smtLevel = smtLevelFromSysFS() // this should not change at all, at least during this suite lifetime
framework.Logf("SMT level: %d", smtLevel)
uncoreGroupSize = getUncoreCPUGroupSize()
framework.Logf("Uncore Group Size: %d", uncoreGroupSize)
e2enodeCgroupV2Enabled = IsCgroup2UnifiedMode()
framework.Logf("cgroup V2 enabled: %v", e2enodeCgroupV2Enabled)
e2enodeCgroupDriver = oldCfg.CgroupDriver
framework.Logf("cgroup driver: %s", e2enodeCgroupDriver)
runtime, _, err := getCRIClient()
framework.ExpectNoError(err, "Failed to get CRI client")
version, err := runtime.Version(context.Background(), "")
framework.ExpectNoError(err, "Failed to get runtime version")
e2enodeRuntimeName = version.GetRuntimeName()
framework.Logf("runtime: %s", e2enodeRuntimeName)
})
ginkgo.AfterAll(func(ctx context.Context) {
updateKubeletConfig(ctx, f, oldCfg, true)
})
ginkgo.BeforeEach(func(ctx context.Context) {
// note intentionally NOT set reservedCPUs - this must be initialized on a test-by-test basis
podMap = make(map[string]*v1.Pod)
})
ginkgo.JustBeforeEach(func(ctx context.Context) {
if !e2enodeCgroupV2Enabled {
e2eskipper.Skipf("Skipping since CgroupV2 not used")
}
})
ginkgo.AfterEach(func(ctx context.Context) {
deletePodsAsync(ctx, f, podMap)
})
ginkgo.When("running guaranteed pod level resources tests", ginkgo.Label("guaranteed pod level resources", "reserved-cpus"), func() {
ginkgo.It("should let the container access all the online CPUs without a reserved CPUs set", func(ctx context.Context) {
updateKubeletConfigIfNeeded(ctx, f, configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
reservedSystemCPUs: cpuset.CPUSet{},
enablePodLevelResources: true,
}))
pod := makeCPUManagerPod("gu-pod-level-resources", []ctnAttribute{
{
ctnName: "gu-container",
cpuRequest: "1",
cpuLimit: "1",
},
})
pod.Spec.Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
}
ginkgo.By("creating the test pod")
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
podMap[string(pod.UID)] = pod
ginkgo.By("checking if the expected cpuset was assigned")
gomega.Expect(pod).To(HaveContainerCPUsEqualTo("gu-container", onlineCPUs))
})
ginkgo.It("should let the container access all the online CPUs when using a reserved CPUs set", func(ctx context.Context) {
reservedCPUs = cpuset.New(0)
updateKubeletConfigIfNeeded(ctx, f, configureCPUManagerInKubelet(oldCfg, &cpuManagerKubeletArguments{
policyName: string(cpumanager.PolicyStatic),
reservedSystemCPUs: reservedCPUs, // Not really needed for the tests but helps to make a more precise check
enablePodLevelResources: true,
}))
pod := makeCPUManagerPod("gu-pod-level-resources", []ctnAttribute{
{
ctnName: "gu-container",
cpuRequest: "1",
cpuLimit: "1",
},
})
pod.Spec.Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("100Mi"),
},
}
ginkgo.By("creating the test pod")
pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
podMap[string(pod.UID)] = pod
ginkgo.By("checking if the expected cpuset was assigned")
gomega.Expect(pod).To(HaveContainerCPUsEqualTo("gu-container", onlineCPUs))
})
})
})
// Matching helpers
func HaveStatusReasonMatchingRegex(expr string) types.GomegaMatcher {

View File

@@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
@@ -238,6 +239,18 @@ func getAllNUMANodes() []int {
return numaNodes
}
func verifyMemoryPinning(f *framework.Framework, ctx context.Context, pod *v1.Pod, numaNodeIDs []int) {
ginkgo.By("Verifying the NUMA pinning")
output, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name)
framework.ExpectNoError(err)
currentNUMANodeIDs, err := cpuset.Parse(strings.Trim(output, "\n"))
framework.ExpectNoError(err)
gomega.Expect(numaNodeIDs).To(gomega.Equal(currentNUMANodeIDs.List()))
}
// Serial because the test updates kubelet configuration.
var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(), framework.WithSerial(), feature.MemoryManager, func() {
// TODO: add more complex tests that will include interaction between CPUManager, MemoryManager and TopologyManager
@@ -267,18 +280,6 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
evictionHard: map[string]string{evictionHardMemory: "100Mi"},
}
verifyMemoryPinning := func(ctx context.Context, pod *v1.Pod, numaNodeIDs []int) {
ginkgo.By("Verifying the NUMA pinning")
output, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name)
framework.ExpectNoError(err)
currentNUMANodeIDs, err := cpuset.Parse(strings.Trim(output, "\n"))
framework.ExpectNoError(err)
gomega.Expect(numaNodeIDs).To(gomega.Equal(currentNUMANodeIDs.List()))
}
waitingForHugepages := func(ctx context.Context, hugepagesCount int) {
gomega.Eventually(ctx, func(ctx context.Context) error {
node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, framework.TestContext.NodeName, metav1.GetOptions{})
@@ -380,7 +381,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
defer conn.Close() //nolint:errcheck
resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{})
framework.ExpectNoError(err)
@@ -444,7 +445,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
return
}
verifyMemoryPinning(ctx, testPod, []int{0})
verifyMemoryPinning(f, ctx, testPod, []int{0})
})
})
@@ -469,7 +470,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
return
}
verifyMemoryPinning(ctx, testPod, []int{0})
verifyMemoryPinning(f, ctx, testPod, []int{0})
})
})
@@ -503,8 +504,8 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
return
}
verifyMemoryPinning(ctx, testPod, []int{0})
verifyMemoryPinning(ctx, testPod2, []int{0})
verifyMemoryPinning(f, ctx, testPod, []int{0})
verifyMemoryPinning(f, ctx, testPod2, []int{0})
})
// TODO: move the test to pod resource API test suite, see - https://github.com/kubernetes/kubernetes/issues/101945
@@ -520,7 +521,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
defer conn.Close() //nolint:errcheck
resp, err := cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
framework.ExpectNoError(err)
@@ -665,7 +666,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
defer conn.Close() //nolint:errcheck
resp, err := cli.GetAllocatableResources(ctx, &kubeletpodresourcesv1.AllocatableResourcesRequest{})
framework.ExpectNoError(err)
@@ -682,7 +683,7 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
defer conn.Close() //nolint:errcheck
resp, err := cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
framework.ExpectNoError(err)
@@ -706,7 +707,139 @@ var _ = SIGDescribe("Memory Manager", "[LinuxOnly]", framework.WithDisruptive(),
return
}
verifyMemoryPinning(ctx, testPod, allNUMANodes)
verifyMemoryPinning(f, ctx, testPod, allNUMANodes)
})
})
})
})
var _ = SIGDescribe("Memory Manager Incompatibility Pod Level Resources", framework.WithDisruptive(), framework.WithSerial(), feature.MemoryManager, feature.PodLevelResources, framework.WithFeatureGate(features.PodLevelResources), func() {
var (
allNUMANodes []int
ctnParams, initCtnParams []memoryManagerCtnAttributes
isMultiNUMASupported *bool
testPod *v1.Pod
)
f := framework.NewDefaultFramework("memory-manager-incompatibility-pod-level-resources-test")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
memoryQuantity := resource.MustParse("1100Mi")
defaultKubeParams := &memoryManagerKubeletParams{
systemReservedMemory: []kubeletconfig.MemoryReservation{
{
NumaNode: 0,
Limits: v1.ResourceList{
resourceMemory: memoryQuantity,
},
},
},
systemReserved: map[string]string{resourceMemory: "500Mi"},
kubeReserved: map[string]string{resourceMemory: "500Mi"},
evictionHard: map[string]string{evictionHardMemory: "100Mi"},
}
ginkgo.BeforeEach(func(ctx context.Context) {
if isMultiNUMASupported == nil {
isMultiNUMASupported = ptr.To(isMultiNUMA())
}
if len(allNUMANodes) == 0 {
allNUMANodes = getAllNUMANodes()
}
})
// dynamically update the kubelet configuration
ginkgo.JustBeforeEach(func(ctx context.Context) {
if len(ctnParams) > 0 {
testPod = makeMemoryManagerPod(ctnParams[0].ctnName, initCtnParams, ctnParams)
testPod.Spec.Resources = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("128Mi"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("128Mi"),
},
}
}
})
ginkgo.JustAfterEach(func(ctx context.Context) {
// delete the test pod
if testPod != nil && testPod.Name != "" {
e2epod.NewPodClient(f).DeleteSync(ctx, testPod.Name, metav1.DeleteOptions{}, f.Timeouts.PodDelete)
}
})
ginkgo.Context("with static policy", func() {
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
kubeParams := *defaultKubeParams
kubeParams.policy = staticPolicy
updateKubeletConfigWithMemoryManagerParams(initialConfig, &kubeParams)
if initialConfig.FeatureGates == nil {
initialConfig.FeatureGates = make(map[string]bool)
}
initialConfig.FeatureGates["PodLevelResources"] = true
})
ginkgo.Context("", func() {
ginkgo.BeforeEach(func() {
// override pod parameters
ctnParams = []memoryManagerCtnAttributes{
{
ctnName: "memory-manager-none",
cpus: "100m",
memory: "128Mi",
},
}
})
ginkgo.JustAfterEach(func() {
// reset containers attributes
ctnParams = []memoryManagerCtnAttributes{}
initCtnParams = []memoryManagerCtnAttributes{}
})
ginkgo.It("should not report any memory data during request to pod resources List when pod has pod level resources", func(ctx context.Context) {
testPod = e2epod.NewPodClient(f).CreateSync(ctx, testPod)
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
var resp *kubeletpodresourcesv1.ListPodResourcesResponse
gomega.Eventually(ctx, func(ctx context.Context) error {
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
if err != nil {
return err
}
defer conn.Close() //nolint:errcheck
resp, err = cli.List(ctx, &kubeletpodresourcesv1.ListPodResourcesRequest{})
return err
}, time.Minute, 5*time.Second).Should(gomega.Succeed())
for _, podResource := range resp.PodResources {
if podResource.Name != testPod.Name {
continue
}
for _, containerResource := range podResource.Containers {
gomega.Expect(containerResource.Memory).To(gomega.BeEmpty())
}
}
})
ginkgo.It("should succeed to start the pod when it has pod level resources", func(ctx context.Context) {
testPod = e2epod.NewPodClient(f).CreateSync(ctx, testPod)
// it no taste to verify NUMA pinning when the node has only one NUMA node
if !*isMultiNUMASupported {
return
}
verifyMemoryPinning(f, ctx, testPod, allNUMANodes)
})
})
})