CPU and Memory manager event when using pod level resources

This commit is contained in:
Kevin Torres
2025-06-30 21:05:58 +00:00
committed by ndixita
parent 924b324088
commit 7804b51f42
9 changed files with 171 additions and 10 deletions

View File

@@ -18,6 +18,7 @@ package allocation
import (
"context"
"errors"
"fmt"
"path/filepath"
"slices"
@@ -41,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@@ -701,6 +703,13 @@ func (m *manager) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, strin
if result := podAdmitHandler.Admit(attrs); !result.Admit {
klog.InfoS("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message)
return false, result.Reason, result.Message
} else if result.Admit && len(result.Errors) > 0 && result.Reason == admission.PodLevelResourcesIncompatible && utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) {
for _, err := range result.Errors {
var admissionWarning admission.Error
if errors.As(err, &admissionWarning) {
m.recorder.Event(attrs.Pod, v1.EventTypeWarning, admission.PodLevelResourcesIncompatible, admissionWarning.Error())
}
}
}
}

View File

@@ -20,11 +20,19 @@ import (
"errors"
"fmt"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
)
const (
ErrorReasonUnexpected = "UnexpectedAdmissionError"
// Explicit reason when CPU/Memory manager's policy is incompatible with pod level resources.
PodLevelResourcesIncompatible = "PodLevelResourcesIncompatible"
// Warnings for pod level resources when manager's policy incompatibility.
CPUManagerPodLevelResourcesError = "CPUManagerPodLevelResourcesError"
MemoryManagerPodLevelResourcesError = "MemoryManagerPodLevelResourcesError"
)
type Error interface {
@@ -49,14 +57,53 @@ func GetPodAdmitResult(err error) lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{Admit: true}
}
var errs []error
// To support multiple pod-level resource errors, we need to check if the error
// is an aggregate error.
var agg utilerrors.Aggregate
if errors.As(err, &agg) {
errs = agg.Errors()
} else {
errs = []error{err}
}
var podLevelWarnings []error
var otherErrs []error
for _, e := range errs {
var admissionErr Error
if errors.As(e, &admissionErr) && (admissionErr.Type() == CPUManagerPodLevelResourcesError || admissionErr.Type() == MemoryManagerPodLevelResourcesError) {
podLevelWarnings = append(podLevelWarnings, e)
} else {
otherErrs = append(otherErrs, e)
}
}
// If all errors are pod-level resource errors, we should treat them as warnings
// and not block pod admission.
if len(otherErrs) == 0 && len(podLevelWarnings) > 0 {
return lifecycle.PodAdmitResult{
Admit: true,
Reason: PodLevelResourcesIncompatible,
Message: "",
Errors: podLevelWarnings,
}
}
if len(otherErrs) == 0 {
// This should not happen if err != nil, but as a safeguard.
return lifecycle.PodAdmitResult{Admit: true}
}
// At this point, we have at least one error that requires pod rejection.
firstErr := otherErrs[0]
var admissionErr Error
if !errors.As(err, &admissionErr) {
admissionErr = &unexpectedAdmissionError{err}
if !errors.As(firstErr, &admissionErr) {
admissionErr = &unexpectedAdmissionError{firstErr}
}
return lifecycle.PodAdmitResult{
Admit: false,
Message: admissionErr.Error(),
Reason: admissionErr.Type(),
Admit: false,
}
}

View File

@@ -18,6 +18,7 @@ package cpumanager
import (
"context"
"errors"
"fmt"
"math"
"sync"
@@ -261,6 +262,13 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
// Call down into the policy to assign this container CPUs if required.
err := m.policy.Allocate(m.state, p, c)
// If it gets this error it means that the pod requires pod level resources but this is not aligned.
// We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning
if errors.As(err, &CPUManagerPodLevelResourcesError{}) {
return err
}
if err != nil {
klog.ErrorS(err, "Allocate error")
return err

View File

@@ -18,10 +18,12 @@ package cpumanager
import (
"fmt"
"strconv"
v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
resourcehelper "k8s.io/component-helpers/resource"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
@@ -35,13 +37,15 @@ import (
)
const (
// PolicyStatic is the name of the static policy.
// Should options be given, these will be ignored and backward (up to 1.21 included)
// compatible behaviour will be enforced
PolicyStatic policyName = "static"
// ErrorSMTAlignment represents the type of an SMTAlignmentError
ErrorSMTAlignment = "SMTAlignmentError"
// ErrorCPUManagerPodLevelResources represents the type of a CPUManagerPodLevelResourcesError
ErrorCPUManagerPodLevelResources = "CPUManagerPodLevelResourcesError"
)
// SMTAlignmentError represents an error due to SMT alignment
@@ -52,6 +56,15 @@ type SMTAlignmentError struct {
CausedByPhysicalCPUs bool
}
// PodLevelResourcesError represents an error due to pod-level resources not being supported.
type CPUManagerPodLevelResourcesError struct{}
func (e CPUManagerPodLevelResourcesError) Error() string {
return "CPU Manager static policy does not support pod-level resources"
}
func (e CPUManagerPodLevelResourcesError) Type() string { return ErrorCPUManagerPodLevelResources }
func (e SMTAlignmentError) Error() string {
if e.CausedByPhysicalCPUs {
return fmt.Sprintf("SMT Alignment Error: not enough free physical CPUs: available physical CPUs = %d, requested CPUs = %d, CPUs per core = %d", e.AvailablePhysicalCPUs, e.RequestedCPUs, e.CpusPerCore)
@@ -316,6 +329,10 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
numCPUs := p.guaranteedCPUs(pod, container)
if numCPUs == 0 {
if p.isPodWithPodLevelResources(pod) {
return CPUManagerPodLevelResourcesError{}
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
}
@@ -467,6 +484,12 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int
klog.V(5).InfoS("Exclusive CPU allocation skipped, pod QoS is not guaranteed", "pod", klog.KObj(pod), "containerName", container.Name, "qos", qos)
return 0
}
// The CPU manager static policy does not support pod-level resources.
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
return 0
}
cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
// In-place pod resize feature makes Container.Resources field mutable for CPU & memory.
// AllocatedResources holds the value of Container.Resources.Requests when the pod was admitted.
@@ -814,3 +837,14 @@ func updateAllocationPerNUMAMetric(topo *topology.CPUTopology, allocatedCPUs cpu
metrics.CPUManagerAllocationPerNUMA.WithLabelValues(strconv.Itoa(numaNode)).Set(float64(count))
}
}
func (p *staticPolicy) isPodWithPodLevelResources(pod *v1.Pod) bool {
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
// The Memory manager static policy does not support pod-level resources.
klog.V(5).InfoS("CPU Manager allocation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod))
return true
}
return false
}

View File

@@ -18,6 +18,7 @@ package memorymanager
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
@@ -273,6 +274,12 @@ func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
// Call down into the policy to assign this container memory if required.
if err := m.policy.Allocate(ctx, m.state, pod, container); err != nil {
// If it gets this error it means that the pod requires pod level resources but this is not aligned.
// We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning
if errors.As(err, &MemoryManagerPodLevelResourcesError{}) {
return err
}
logger.Error(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name)
return err
}

View File

@@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
resourcehelper "k8s.io/component-helpers/resource"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@@ -38,7 +39,12 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
)
const PolicyTypeStatic policyType = "Static"
const (
PolicyTypeStatic policyType = "Static"
// ErrorMemoryManagerPodLevelResources represents the type of a MemoryManagerPodLevelResourcesError
ErrorMemoryManagerPodLevelResources = "MemoryManagerPodLevelResourcesError"
)
type systemReservedMemory map[int]map[v1.ResourceName]uint64
type reusableMemory map[string]map[string]map[v1.ResourceName]uint64
@@ -60,6 +66,16 @@ type staticPolicy struct {
var _ Policy = &staticPolicy{}
type MemoryManagerPodLevelResourcesError struct{}
func (e MemoryManagerPodLevelResourcesError) Type() string {
return ErrorMemoryManagerPodLevelResources
}
func (e MemoryManagerPodLevelResourcesError) Error() string {
return "Memory Manager static policy does not support pod-level resources"
}
// NewPolicyStatic returns new static policy instance
func NewPolicyStatic(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
var totalSystemReserved uint64
@@ -107,6 +123,10 @@ func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod,
return nil
}
if p.isPodWithPodLevelResources(ctx, pod) {
return MemoryManagerPodLevelResourcesError{}
}
podUID := string(pod.UID)
logger.Info("Allocate")
// container belongs in an exclusively allocated pool
@@ -406,6 +426,10 @@ func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, p
return nil
}
if p.isPodWithPodLevelResources(ctx, pod) {
return nil
}
reqRsrcs, err := getPodRequestedResources(pod)
if err != nil {
logger.Error(err, "Failed to get pod requested resources", "podUID", pod.UID)
@@ -436,6 +460,10 @@ func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod
return nil
}
if p.isPodWithPodLevelResources(ctx, pod) {
return nil
}
requestedResources, err := getRequestedResources(pod, container)
if err != nil {
logger.Error(err, "Failed to get container requested resources", "podUID", pod.UID, "containerName", container.Name)
@@ -1076,3 +1104,16 @@ func isAffinityViolatingNUMAAllocations(machineState state.NUMANodeMap, mask bit
}
return false
}
func (p *staticPolicy) isPodWithPodLevelResources(ctx context.Context, pod *v1.Pod) bool {
logger := klog.FromContext(ctx)
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
// The Memory manager static policy does not support pod-level resources.
logger.V(5).Info("Memory manager allocation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "pod", klog.KObj(pod))
return true
}
return false
}

View File

@@ -2017,9 +2017,9 @@ func TestStaticPolicyAllocate(t *testing.T) {
}
err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0])
if !reflect.DeepEqual(err, testCase.expectedError) {
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
}
if (err == nil) != (testCase.expectedError == nil) || (err != nil && testCase.expectedError != nil && err.Error() != testCase.expectedError.Error()) {
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
}
if err != nil {
return

View File

@@ -19,8 +19,11 @@ package topologymanager
import (
"sync"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@@ -148,11 +151,21 @@ func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult {
// It would be better to implement this function in topologymanager instead of scope
// but topologymanager do not track providers anymore
func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error {
isPodLevelResourcesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources)
var errs []error
for _, provider := range s.hintProviders {
err := provider.Allocate(pod, container)
if err != nil {
if err != nil && isPodLevelResourcesEnabled {
errs = append(errs, err)
} else if err != nil {
return err
}
}
if isPodLevelResourcesEnabled {
return utilerrors.NewAggregate(errs)
}
return nil
}

View File

@@ -35,6 +35,8 @@ type PodAdmitResult struct {
Reason string
// a brief message explaining why the pod could not be admitted.
Message string
// all errors for why the pod could not be admitted.
Errors []error
}
// PodAdmitHandler is notified during pod admission.