Merge pull request #130727 from swatisehgal/mm-mgr-contexual-logging

node: mm-mgr: Migrate Memory Manager to contextual logging
This commit is contained in:
Kubernetes Prow Robot
2025-07-21 05:50:29 -07:00
committed by GitHub
21 changed files with 303 additions and 215 deletions

View File

@@ -204,6 +204,7 @@ linters:
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*

View File

@@ -218,6 +218,7 @@ linters:
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*

View File

@@ -50,6 +50,7 @@ contextual k8s.io/kubernetes/pkg/controller/.*
contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/pleg/.*
contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.*
contextual k8s.io/kubernetes/pkg/kubelet/token/.*

View File

@@ -337,6 +337,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.topologyManager.AddHintProvider(cm.cpuManager)
cm.memoryManager, err = memorymanager.NewManager(
context.TODO(),
nodeConfig.MemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
@@ -591,7 +592,7 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
}
// Initialize memory manager
err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
err = cm.memoryManager.Start(ctx, memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
if err != nil {
return fmt.Errorf("start memory manager error: %w", err)
}
@@ -954,7 +955,9 @@ func (cm *containerManagerImpl) GetMemory(podUID, containerName string) []*podre
return []*podresourcesapi.ContainerMemory{}
}
return containerMemoryFromBlock(cm.memoryManager.GetMemory(podUID, containerName))
// This is tempporary as part of migration of memory manager to Contextual logging.
// Direct context to be passed when container manager is migrated.
return containerMemoryFromBlock(cm.memoryManager.GetMemory(context.TODO(), podUID, containerName))
}
func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
@@ -962,7 +965,9 @@ func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.Contai
return []*podresourcesapi.ContainerMemory{}
}
return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory())
// This is tempporary as part of migration of memory manager to Contextual logging.
// Direct context to be passed when container manager is migrated.
return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory(context.TODO()))
}
func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource {

View File

@@ -43,12 +43,14 @@ import (
type containerManagerStub struct {
shouldResetExtendedResourceCapacity bool
extendedPluginResources v1.ResourceList
memoryManager memorymanager.Manager
}
var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
func (cm *containerManagerStub) Start(ctx context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
klog.V(2).InfoS("Starting stub container manager")
cm.memoryManager = memorymanager.NewFakeManager(ctx)
return nil
}
@@ -125,7 +127,7 @@ func (cm *containerManagerStub) UpdatePluginResources(*schedulerframework.NodeIn
}
func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()}
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), cm.memoryManager, topologymanager.NewFakeManager()}
}
func (cm *containerManagerStub) GetPodCgroupRoot() string {

View File

@@ -103,7 +103,7 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
}
// Initialize memory manager
err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
err = cm.memoryManager.Start(ctx, memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
}
@@ -136,7 +136,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.topologyManager = topologymanager.NewFakeManager()
cm.cpuManager = cpumanager.NewFakeManager()
cm.memoryManager = memorymanager.NewFakeManager()
cm.memoryManager = memorymanager.NewFakeManager(context.TODO())
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
klog.InfoS("Creating topology manager")
@@ -168,6 +168,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
klog.InfoS("Creating memory manager")
cm.memoryManager, err = memorymanager.NewManager(
context.TODO(),
nodeConfig.MemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),

View File

@@ -45,6 +45,7 @@ type FakeContainerManager struct {
PodContainerManager *FakePodContainerManager
shouldResetExtendedResourceCapacity bool
nodeConfig NodeConfig
memoryManager memorymanager.Manager
}
var _ ContainerManager = &FakeContainerManager{}
@@ -52,6 +53,7 @@ var _ ContainerManager = &FakeContainerManager{}
func NewFakeContainerManager() *FakeContainerManager {
return &FakeContainerManager{
PodContainerManager: NewFakePodContainerManager(),
memoryManager: memorymanager.NewFakeManager(context.TODO()),
}
}
@@ -179,7 +181,7 @@ func (cm *FakeContainerManager) InternalContainerLifecycle() InternalContainerLi
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "InternalContainerLifecycle")
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()}
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), cm.memoryManager, topologymanager.NewFakeManager()}
}
func (cm *FakeContainerManager) GetPodCgroupRoot() string {

View File

@@ -17,7 +17,9 @@ limitations under the License.
package cm
import (
"k8s.io/api/core/v1"
"context"
v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
@@ -43,7 +45,7 @@ func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, containe
}
if i.memoryManager != nil {
i.memoryManager.AddContainer(pod, container, containerID)
i.memoryManager.AddContainer(context.TODO(), pod, container, containerID)
}
i.topologyManager.AddContainer(pod, container, containerID)

View File

@@ -20,6 +20,7 @@ limitations under the License.
package cm
import (
"context"
"strconv"
"strings"
@@ -37,7 +38,7 @@ func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, contain
}
if i.memoryManager != nil {
numaNodes := i.memoryManager.GetMemoryNUMANodes(pod, container)
numaNodes := i.memoryManager.GetMemoryNUMANodes(context.TODO(), pod, container)
if numaNodes.Len() > 0 {
var affinity []string
for _, numaNode := range sets.List(numaNodes) {

View File

@@ -20,6 +20,7 @@ limitations under the License.
package cm
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
@@ -47,7 +48,7 @@ func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, contain
var numaNodes sets.Set[int]
if i.memoryManager != nil {
numaNodes = i.memoryManager.GetMemoryNUMANodes(pod, container)
numaNodes = i.memoryManager.GetMemoryNUMANodes(context.TODO(), pod, container)
}
// Gather all CPUs associated with the selected NUMA nodes

View File

@@ -17,6 +17,8 @@ limitations under the License.
package memorymanager
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
@@ -31,42 +33,53 @@ type fakeManager struct {
state state.State
}
func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.InfoS("Start()")
func (m *fakeManager) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
logger := klog.FromContext(ctx)
logger.Info("Start()")
return nil
}
func (m *fakeManager) Policy() Policy {
klog.InfoS("Policy()")
return NewPolicyNone()
func (m *fakeManager) Policy(ctx context.Context) Policy {
logger := klog.FromContext(ctx)
logger.Info("Policy()")
return NewPolicyNone(ctx)
}
func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
ctx := context.TODO()
logger := klog.FromContext(ctx)
logger.Info("Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
return nil
}
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
klog.InfoS("Add container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID)
func (m *fakeManager) AddContainer(ctx context.Context, pod *v1.Pod, container *v1.Container, containerID string) {
logger := klog.FromContext(ctx)
logger.Info("Add container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID)
}
func (m *fakeManager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int] {
klog.InfoS("Get MemoryNUMANodes", "pod", klog.KObj(pod), "containerName", container.Name)
func (m *fakeManager) GetMemoryNUMANodes(ctx context.Context, pod *v1.Pod, container *v1.Container) sets.Set[int] {
logger := klog.FromContext(ctx)
logger.Info("Get MemoryNUMANodes", "pod", klog.KObj(pod), "containerName", container.Name)
return nil
}
func (m *fakeManager) RemoveContainer(containerID string) error {
klog.InfoS("RemoveContainer", "containerID", containerID)
func (m *fakeManager) RemoveContainer(ctx context.Context, containerID string) error {
logger := klog.FromContext(ctx)
logger.Info("RemoveContainer", "containerID", containerID)
return nil
}
func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
klog.InfoS("Get Topology Hints", "pod", klog.KObj(pod), "containerName", container.Name)
ctx := context.TODO()
logger := klog.FromContext(ctx)
logger.Info("Get Topology Hints", "pod", klog.KObj(pod), "containerName", container.Name)
return map[string][]topologymanager.TopologyHint{}
}
func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
klog.InfoS("Get Pod Topology Hints", "pod", klog.KObj(pod))
ctx := context.TODO()
logger := klog.FromContext(ctx)
logger.Info("Get Pod Topology Hints", "pod", klog.KObj(pod))
return map[string][]topologymanager.TopologyHint{}
}
@@ -75,20 +88,24 @@ func (m *fakeManager) State() state.Reader {
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (m *fakeManager) GetAllocatableMemory() []state.Block {
klog.InfoS("Get Allocatable Memory")
func (m *fakeManager) GetAllocatableMemory(ctx context.Context) []state.Block {
logger := klog.FromContext(ctx)
logger.Info("Get Allocatable Memory")
return []state.Block{}
}
// GetMemory returns the memory allocated by a container from NUMA nodes
func (m *fakeManager) GetMemory(podUID, containerName string) []state.Block {
klog.InfoS("Get Memory", "podUID", podUID, "containerName", containerName)
func (m *fakeManager) GetMemory(ctx context.Context, podUID, containerName string) []state.Block {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "podUID", podUID, "containerName", containerName)
logger.Info("Get Memory")
return []state.Block{}
}
// NewFakeManager creates empty/fake memory manager
func NewFakeManager() Manager {
func NewFakeManager(ctx context.Context) Manager {
logger := klog.LoggerWithName(klog.FromContext(ctx), "memory-mgr.fake")
return &fakeManager{
state: state.NewMemoryState(),
// logger: logger,
state: state.NewMemoryState(logger),
}
}

View File

@@ -57,11 +57,11 @@ func (s *sourcesReadyStub) AllReady() bool { return true }
// Manager interface provides methods for Kubelet to manage pod memory.
type Manager interface {
// Start is called during Kubelet initialization.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
// AddContainer adds the mapping between container ID to pod UID and the container name
// The mapping used to remove the memory allocation during the container removal
AddContainer(p *v1.Pod, c *v1.Container, containerID string)
AddContainer(ctx context.Context, p *v1.Pod, c *v1.Container, containerID string)
// Allocate is called to pre-allocate memory resources during Pod admission.
// This must be called at some point prior to the AddContainer() call for a container, e.g. at pod admission time.
@@ -69,7 +69,7 @@ type Manager interface {
// RemoveContainer is called after Kubelet decides to kill or delete a
// container. After this call, any memory allocated to the container is freed.
RemoveContainer(containerID string) error
RemoveContainer(ctx context.Context, containerID string) error
// State returns a read-only interface to the internal memory manager state.
State() state.Reader
@@ -85,13 +85,13 @@ type Manager interface {
GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
// GetMemoryNUMANodes provides NUMA nodes that are used to allocate the container memory
GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int]
GetMemoryNUMANodes(ctx context.Context, pod *v1.Pod, container *v1.Container) sets.Set[int]
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
GetAllocatableMemory() []state.Block
GetAllocatableMemory(ctx context.Context) []state.Block
// GetMemory returns the memory allocated by a container from NUMA nodes
GetMemory(podUID, containerName string) []state.Block
GetMemory(ctx context.Context, podUID, containerName string) []state.Block
}
type manager struct {
@@ -132,13 +132,13 @@ type manager struct {
var _ Manager = &manager{}
// NewManager returns new instance of the memory manager
func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
func NewManager(ctx context.Context, policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var policy Policy
switch policyType(policyName) {
case policyTypeNone:
policy = NewPolicyNone()
policy = NewPolicyNone(ctx)
case PolicyTypeStatic:
if runtime.GOOS == "windows" {
@@ -150,7 +150,7 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll
return nil, err
}
policy, err = NewPolicyStatic(machineInfo, systemReserved, affinity)
policy, err = NewPolicyStatic(ctx, machineInfo, systemReserved, affinity)
if err != nil {
return nil, err
}
@@ -161,7 +161,7 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll
if err != nil {
return nil, err
}
policy, err = NewPolicyBestEffort(machineInfo, systemReserved, affinity)
policy, err = NewPolicyBestEffort(ctx, machineInfo, systemReserved, affinity)
if err != nil {
return nil, err
}
@@ -182,35 +182,36 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll
}
// Start starts the memory manager under the kubelet and calls policy start
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.InfoS("Starting memorymanager", "policy", m.policy.Name())
func (m *manager) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
logger := klog.FromContext(ctx)
logger.Info("Starting memorymanager", "policy", m.policy.Name())
m.sourcesReady = sourcesReady
m.activePods = activePods
m.podStatusProvider = podStatusProvider
m.containerRuntime = containerRuntime
m.containerMap = initialContainers
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, memoryManagerStateFileName, m.policy.Name())
stateImpl, err := state.NewCheckpointState(logger, m.stateFileDirectory, memoryManagerStateFileName, m.policy.Name())
if err != nil {
klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
logger.Error(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
return err
}
m.state = stateImpl
err = m.policy.Start(m.state)
err = m.policy.Start(ctx, m.state)
if err != nil {
klog.ErrorS(err, "Policy start error")
logger.Error(err, "Policy start error")
return err
}
m.allocatableMemory = m.policy.GetAllocatableMemory(m.state)
m.allocatableMemory = m.policy.GetAllocatableMemory(ctx, m.state)
klog.V(4).InfoS("memorymanager started", "policy", m.policy.Name())
logger.V(4).Info("memorymanager started", "policy", m.policy.Name())
return nil
}
// AddContainer saves the value of requested memory for the guaranteed pod under the state and set memory affinity according to the topolgy manager
func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
func (m *manager) AddContainer(ctx context.Context, pod *v1.Pod, container *v1.Container, containerID string) {
m.Lock()
defer m.Unlock()
@@ -238,7 +239,9 @@ func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID
}
// GetMemoryNUMANodes provides NUMA nodes that used to allocate the container memory
func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int] {
func (m *manager) GetMemoryNUMANodes(ctx context.Context, pod *v1.Pod, container *v1.Container) sets.Set[int] {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KObj(pod), "containerName", container.Name)
// Get NUMA node affinity of blocks assigned to the container during Allocate()
numaNodes := sets.New[int]()
for _, block := range m.state.GetMemoryBlocks(string(pod.UID), container.Name) {
@@ -249,39 +252,44 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.
}
if numaNodes.Len() == 0 {
klog.V(5).InfoS("NUMA nodes not available for allocation", "pod", klog.KObj(pod), "containerName", container.Name)
logger.V(5).Info("NUMA nodes not available for allocation")
return nil
}
klog.InfoS("Memory affinity", "pod", klog.KObj(pod), "containerName", container.Name, "numaNodes", numaNodes)
logger.Info("Memory affinity", "numaNodes", numaNodes)
return numaNodes
}
// Allocate is called to pre-allocate memory resources during Pod admission.
func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
// Garbage collect any stranded resources before allocation
m.removeStaleState()
ctx := context.TODO()
logger := klog.FromContext(ctx)
m.removeStaleState(logger)
m.Lock()
defer m.Unlock()
// Call down into the policy to assign this container memory if required.
if err := m.policy.Allocate(m.state, pod, container); err != nil {
klog.ErrorS(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name)
if err := m.policy.Allocate(ctx, m.state, pod, container); err != nil {
logger.Error(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name)
return err
}
return nil
}
// RemoveContainer removes the container from the state
func (m *manager) RemoveContainer(containerID string) error {
func (m *manager) RemoveContainer(ctx context.Context, containerID string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "containerID", containerID)
m.Lock()
defer m.Unlock()
// if error appears it means container entry already does not exist under the container map
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
if err != nil {
klog.ErrorS(err, "Failed to get container from container map", "containerID", containerID)
logger.Error(err, "Failed to get container from container map")
return nil
}
@@ -297,22 +305,26 @@ func (m *manager) State() state.Reader {
// GetPodTopologyHints returns the topology hints for the topology manager
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// Use context.TODO() because we currently do not have a proper context to pass in.
// This should be replaced with an appropriate context when refactoring this function to accept a context parameter.
ctx := context.TODO()
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
m.removeStaleState(klog.FromContext(ctx))
// Delegate to active policy
return m.policy.GetPodTopologyHints(m.state, pod)
return m.policy.GetPodTopologyHints(context.TODO(), m.state, pod)
}
// GetTopologyHints returns the topology hints for the topology manager
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
m.removeStaleState(klog.TODO())
// Delegate to active policy
return m.policy.GetTopologyHints(m.state, pod, container)
return m.policy.GetTopologyHints(context.TODO(), m.state, pod, container)
}
// TODO: move the method to the upper level, to re-use it under the CPU and memory managers
func (m *manager) removeStaleState() {
func (m *manager) removeStaleState(logger klog.Logger) {
// Only once all sources are ready do we attempt to remove any stale state.
// This ensures that the call to `m.activePods()` below will succeed with
// the actual active pods list.
@@ -345,7 +357,7 @@ func (m *manager) removeStaleState() {
for podUID := range assignments {
for containerName := range assignments[podUID] {
if _, ok := activeContainers[podUID][containerName]; !ok {
klog.V(2).InfoS("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName)
logger.V(2).Info("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName)
m.policyRemoveContainerByRef(podUID, containerName)
}
}
@@ -353,14 +365,14 @@ func (m *manager) removeStaleState() {
m.containerMap.Visit(func(podUID, containerName, containerID string) {
if _, ok := activeContainers[podUID][containerName]; !ok {
klog.V(2).InfoS("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName)
logger.V(2).Info("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName)
m.policyRemoveContainerByRef(podUID, containerName)
}
})
}
func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) {
m.policy.RemoveContainer(m.state, podUID, containerName)
m.policy.RemoveContainer(context.TODO(), m.state, podUID, containerName)
m.containerMap.RemoveByContainerRef(podUID, containerName)
}
@@ -458,11 +470,11 @@ func getSystemReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatab
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (m *manager) GetAllocatableMemory() []state.Block {
func (m *manager) GetAllocatableMemory(ctx context.Context) []state.Block {
return m.allocatableMemory
}
// GetMemory returns the memory allocated by a container from NUMA nodes
func (m *manager) GetMemory(podUID, containerName string) []state.Block {
func (m *manager) GetMemory(ctx context.Context, podUID, containerName string) []state.Block {
return m.state.GetMemoryBlocks(podUID, containerName)
}

View File

@@ -25,8 +25,6 @@ import (
"strings"
"testing"
"k8s.io/klog/v2"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
@@ -39,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@@ -72,17 +71,17 @@ type testMemoryManager struct {
activePods []*v1.Pod
}
func returnPolicyByName(testCase testMemoryManager) Policy {
func returnPolicyByName(ctx context.Context, testCase testMemoryManager) Policy {
switch testCase.policyName {
case policyTypeMock:
return &mockPolicy{
err: fmt.Errorf("fake reg error"),
}
case PolicyTypeStatic:
policy, _ := NewPolicyStatic(&testCase.machineInfo, testCase.reserved, topologymanager.NewFakeManager())
policy, _ := NewPolicyStatic(ctx, &testCase.machineInfo, testCase.reserved, topologymanager.NewFakeManager())
return policy
case policyTypeNone:
return NewPolicyNone()
return NewPolicyNone(ctx)
}
return nil
}
@@ -95,27 +94,27 @@ func (p *mockPolicy) Name() string {
return string(policyTypeMock)
}
func (p *mockPolicy) Start(s state.State) error {
func (p *mockPolicy) Start(context.Context, state.State) error {
return p.err
}
func (p *mockPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
func (p *mockPolicy) Allocate(context.Context, state.State, *v1.Pod, *v1.Container) error {
return p.err
}
func (p *mockPolicy) RemoveContainer(s state.State, podUID string, containerName string) {
func (p *mockPolicy) RemoveContainer(context.Context, state.State, string, string) {
}
func (p *mockPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
func (p *mockPolicy) GetTopologyHints(context.Context, state.State, *v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint {
return nil
}
func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
func (p *mockPolicy) GetPodTopologyHints(context.Context, state.State, *v1.Pod) map[string][]topologymanager.TopologyHint {
return nil
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (p *mockPolicy) GetAllocatableMemory(s state.State) []state.Block {
func (p *mockPolicy) GetAllocatableMemory(context.Context, state.State) []state.Block {
return []state.Block{}
}
@@ -485,6 +484,7 @@ func TestGetSystemReservedMemory(t *testing.T) {
}
func TestRemoveStaleState(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
machineInfo := returnMachineInfo()
testCases := []testMemoryManager{
{
@@ -896,8 +896,8 @@ func TestRemoveStaleState(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
mgr := &manager{
policy: returnPolicyByName(testCase),
state: state.NewMemoryState(),
policy: returnPolicyByName(tCtx, testCase),
state: state.NewMemoryState(logger),
containerMap: containermap.NewContainerMap(),
containerRuntime: mockRuntimeService{
err: nil,
@@ -909,13 +909,13 @@ func TestRemoveStaleState(t *testing.T) {
mgr.state.SetMemoryAssignments(testCase.assignments)
mgr.state.SetMachineState(testCase.machineState)
mgr.removeStaleState()
mgr.removeStaleState(logger)
if !areContainerMemoryAssignmentsEqual(t, mgr.state.GetMemoryAssignments(), testCase.expectedAssignments) {
t.Errorf("Memory Manager removeStaleState() error, expected assignments %v, but got: %v",
testCase.expectedAssignments, mgr.state.GetMemoryAssignments())
}
if !areMachineStatesEqual(mgr.state.GetMachineState(), testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, mgr.state.GetMachineState(), testCase.expectedMachineState) {
t.Fatalf("The actual machine state: %v is different from the expected one: %v", mgr.state.GetMachineState(), testCase.expectedMachineState)
}
})
@@ -924,6 +924,7 @@ func TestRemoveStaleState(t *testing.T) {
}
func TestAddContainer(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
machineInfo := returnMachineInfo()
reserved := systemReservedMemory{
0: map[v1.ResourceName]uint64{
@@ -1388,8 +1389,8 @@ func TestAddContainer(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
mgr := &manager{
policy: returnPolicyByName(testCase),
state: state.NewMemoryState(),
policy: returnPolicyByName(tCtx, testCase),
state: state.NewMemoryState(logger),
containerMap: containermap.NewContainerMap(),
containerRuntime: mockRuntimeService{
err: testCase.updateError,
@@ -1410,14 +1411,14 @@ func TestAddContainer(t *testing.T) {
t.Errorf("Memory Manager Allocate() error (%v), expected error: %v, but got: %v",
testCase.description, testCase.expectedAllocateError, err)
}
mgr.AddContainer(pod, container, "fakeID")
mgr.AddContainer(tCtx, pod, container, "fakeID")
_, _, err = mgr.containerMap.GetContainerRef("fakeID")
if !reflect.DeepEqual(err, testCase.expectedAddContainerError) {
t.Errorf("Memory Manager AddContainer() error (%v), expected error: %v, but got: %v",
testCase.description, testCase.expectedAddContainerError, err)
}
if !areMachineStatesEqual(mgr.state.GetMachineState(), testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, mgr.state.GetMachineState(), testCase.expectedMachineState) {
t.Errorf("[test] %+v", mgr.state.GetMemoryAssignments())
t.Fatalf("The actual machine state: %v is different from the expected one: %v", mgr.state.GetMachineState(), testCase.expectedMachineState)
}
@@ -1427,6 +1428,7 @@ func TestAddContainer(t *testing.T) {
}
func TestRemoveContainer(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
machineInfo := returnMachineInfo()
reserved := systemReservedMemory{
0: map[v1.ResourceName]uint64{
@@ -1864,8 +1866,8 @@ func TestRemoveContainer(t *testing.T) {
iniContainerMap.Add("fakePod1", "fakeContainer1", "fakeID1")
iniContainerMap.Add("fakePod1", "fakeContainer2", "fakeID2")
mgr := &manager{
policy: returnPolicyByName(testCase),
state: state.NewMemoryState(),
policy: returnPolicyByName(tCtx, testCase),
state: state.NewMemoryState(logger),
containerMap: iniContainerMap,
containerRuntime: mockRuntimeService{
err: testCase.expectedError,
@@ -1877,7 +1879,7 @@ func TestRemoveContainer(t *testing.T) {
mgr.state.SetMemoryAssignments(testCase.assignments)
mgr.state.SetMachineState(testCase.machineState)
err := mgr.RemoveContainer(testCase.removeContainerID)
err := mgr.RemoveContainer(tCtx, testCase.removeContainerID)
if !reflect.DeepEqual(err, testCase.expectedError) {
t.Errorf("Memory Manager RemoveContainer() error (%v), expected error: %v, but got: %v",
testCase.description, testCase.expectedError, err)
@@ -1888,7 +1890,7 @@ func TestRemoveContainer(t *testing.T) {
testCase.expectedAssignments, mgr.state.GetMemoryAssignments(), testCase.expectedAssignments)
}
if !areMachineStatesEqual(mgr.state.GetMachineState(), testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, mgr.state.GetMachineState(), testCase.expectedMachineState) {
t.Errorf("[test] %+v", mgr.state.GetMemoryAssignments())
t.Errorf("[test] %+v, %+v", mgr.state.GetMachineState()[0].MemoryMap["memory"], mgr.state.GetMachineState()[1].MemoryMap["memory"])
t.Fatalf("The actual machine state: %v is different from the expected one: %v", mgr.state.GetMachineState(), testCase.expectedMachineState)
@@ -1905,6 +1907,7 @@ func getPolicyNameForOs() policyType {
}
func TestNewManager(t *testing.T) {
tCtx := ktesting.Init(t)
machineInfo := returnMachineInfo()
expectedReserved := systemReservedMemory{
0: map[v1.ResourceName]uint64{
@@ -1996,7 +1999,7 @@ func TestNewManager(t *testing.T) {
}
defer os.RemoveAll(stateFileDirectory)
mgr, err := NewManager(string(testCase.policyName), &testCase.machineInfo, testCase.nodeAllocatableReservation, testCase.systemReservedMemory, stateFileDirectory, testCase.affinity)
mgr, err := NewManager(tCtx, string(testCase.policyName), &testCase.machineInfo, testCase.nodeAllocatableReservation, testCase.systemReservedMemory, stateFileDirectory, testCase.affinity)
if !reflect.DeepEqual(err, testCase.expectedError) {
t.Errorf("Could not create the Memory Manager. Expected error: '%v', but got: '%v'",
@@ -2026,6 +2029,7 @@ func TestNewManager(t *testing.T) {
}
func TestGetTopologyHints(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
testCases := []testMemoryManager{
{
description: "Successful hint generation",
@@ -2146,8 +2150,8 @@ func TestGetTopologyHints(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
mgr := &manager{
policy: returnPolicyByName(testCase),
state: state.NewMemoryState(),
policy: returnPolicyByName(tCtx, testCase),
state: state.NewMemoryState(logger),
containerMap: containermap.NewContainerMap(),
containerRuntime: mockRuntimeService{
err: nil,
@@ -2171,6 +2175,7 @@ func TestGetTopologyHints(t *testing.T) {
}
func TestAllocateAndAddPodWithInitContainers(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
testCases := []testMemoryManager{
{
description: "should remove init containers from the state file, once app container started",
@@ -2321,10 +2326,10 @@ func TestAllocateAndAddPodWithInitContainers(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
klog.InfoS("TestAllocateAndAddPodWithInitContainers", "name", testCase.description)
logger.Info("TestAllocateAndAddPodWithInitContainers", "name", testCase.description)
mgr := &manager{
policy: returnPolicyByName(testCase),
state: state.NewMemoryState(),
policy: returnPolicyByName(tCtx, testCase),
state: state.NewMemoryState(logger),
containerMap: containermap.NewContainerMap(),
containerRuntime: mockRuntimeService{
err: nil,
@@ -2354,12 +2359,12 @@ func TestAllocateAndAddPodWithInitContainers(t *testing.T) {
// Calls AddContainer for init containers
for i, initContainer := range testCase.podAllocate.Spec.InitContainers {
mgr.AddContainer(testCase.podAllocate, &testCase.podAllocate.Spec.InitContainers[i], initContainer.Name)
mgr.AddContainer(tCtx, testCase.podAllocate, &testCase.podAllocate.Spec.InitContainers[i], initContainer.Name)
}
// Calls AddContainer for apps containers
for i, appContainer := range testCase.podAllocate.Spec.Containers {
mgr.AddContainer(testCase.podAllocate, &testCase.podAllocate.Spec.Containers[i], appContainer.Name)
mgr.AddContainer(tCtx, testCase.podAllocate, &testCase.podAllocate.Spec.Containers[i], appContainer.Name)
}
assignments := mgr.state.GetMemoryAssignments()
@@ -2368,7 +2373,7 @@ func TestAllocateAndAddPodWithInitContainers(t *testing.T) {
}
machineState := mgr.state.GetMachineState()
if !areMachineStatesEqual(machineState, testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) {
t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState)
}
})

View File

@@ -17,6 +17,8 @@ limitations under the License.
package memorymanager
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@@ -28,19 +30,19 @@ type policyType string
// Policy implements logic for pod container to a memory assignment.
type Policy interface {
Name() string
Start(s state.State) error
Start(ctx context.Context, s state.State) error
// Allocate call is idempotent
Allocate(s state.State, pod *v1.Pod, container *v1.Container) error
Allocate(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) error
// RemoveContainer call is idempotent
RemoveContainer(s state.State, podUID string, containerName string)
RemoveContainer(ctx context.Context, s state.State, podUID string, containerName string)
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
GetTopologyHints(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
GetPodTopologyHints(ctx context.Context, s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
GetAllocatableMemory(s state.State) []state.Block
GetAllocatableMemory(ctx context.Context, s state.State) []state.Block
}

View File

@@ -17,6 +17,8 @@ limitations under the License.
package memorymanager
import (
"context"
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
@@ -39,8 +41,8 @@ type bestEffortPolicy struct {
var _ Policy = &bestEffortPolicy{}
func NewPolicyBestEffort(machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
p, err := NewPolicyStatic(machineInfo, reserved, affinity)
func NewPolicyBestEffort(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
p, err := NewPolicyStatic(ctx, machineInfo, reserved, affinity)
if err != nil {
return nil, err
@@ -55,26 +57,26 @@ func (p *bestEffortPolicy) Name() string {
return string(policyTypeBestEffort)
}
func (p *bestEffortPolicy) Start(s state.State) error {
return p.static.Start(s)
func (p *bestEffortPolicy) Start(ctx context.Context, s state.State) error {
return p.static.Start(ctx, s)
}
func (p *bestEffortPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
return p.static.Allocate(s, pod, container)
func (p *bestEffortPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
return p.static.Allocate(ctx, s, pod, container)
}
func (p *bestEffortPolicy) RemoveContainer(s state.State, podUID string, containerName string) {
p.static.RemoveContainer(s, podUID, containerName)
func (p *bestEffortPolicy) RemoveContainer(ctx context.Context, s state.State, podUID string, containerName string) {
p.static.RemoveContainer(ctx, s, podUID, containerName)
}
func (p *bestEffortPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return p.static.GetPodTopologyHints(s, pod)
func (p *bestEffortPolicy) GetPodTopologyHints(ctx context.Context, s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return p.static.GetPodTopologyHints(ctx, s, pod)
}
func (p *bestEffortPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
return p.static.GetTopologyHints(s, pod, container)
func (p *bestEffortPolicy) GetTopologyHints(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
return p.static.GetTopologyHints(ctx, s, pod, container)
}
func (p *bestEffortPolicy) GetAllocatableMemory(s state.State) []state.Block {
return p.static.GetAllocatableMemory(s)
func (p *bestEffortPolicy) GetAllocatableMemory(ctx context.Context, s state.State) []state.Block {
return p.static.GetAllocatableMemory(ctx, s)
}

View File

@@ -17,7 +17,10 @@ limitations under the License.
package memorymanager
import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
@@ -31,7 +34,7 @@ type none struct{}
var _ Policy = &none{}
// NewPolicyNone returns new none policy instance
func NewPolicyNone() Policy {
func NewPolicyNone(ctx context.Context) Policy {
return &none{}
}
@@ -39,34 +42,36 @@ func (p *none) Name() string {
return string(policyTypeNone)
}
func (p *none) Start(s state.State) error {
func (p *none) Start(ctx context.Context, s state.State) error {
logger := klog.FromContext(ctx)
logger.Info("Start")
return nil
}
// Allocate call is idempotent
func (p *none) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
func (p *none) Allocate(_ context.Context, s state.State, pod *v1.Pod, container *v1.Container) error {
return nil
}
// RemoveContainer call is idempotent
func (p *none) RemoveContainer(s state.State, podUID string, containerName string) {
func (p *none) RemoveContainer(_ context.Context, s state.State, podUID string, containerName string) {
}
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
func (p *none) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
func (p *none) GetTopologyHints(_ context.Context, s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
return nil
}
// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
func (p *none) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
func (p *none) GetPodTopologyHints(_ context.Context, s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return nil
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (p *none) GetAllocatableMemory(s state.State) []state.Block {
func (p *none) GetAllocatableMemory(_ context.Context, s state.State) []state.Block {
return []state.Block{}
}

View File

@@ -17,9 +17,11 @@ limitations under the License.
package memorymanager
import (
"context"
"fmt"
"sort"
"github.com/go-logr/logr"
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
@@ -59,7 +61,7 @@ type staticPolicy struct {
var _ Policy = &staticPolicy{}
// NewPolicyStatic returns new static policy instance
func NewPolicyStatic(machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
func NewPolicyStatic(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
var totalSystemReserved uint64
for _, node := range reserved {
if _, ok := node[v1.ResourceMemory]; !ok {
@@ -85,25 +87,28 @@ func (p *staticPolicy) Name() string {
return string(PolicyTypeStatic)
}
func (p *staticPolicy) Start(s state.State) error {
if err := p.validateState(s); err != nil {
klog.ErrorS(err, "Invalid state, please drain node and remove policy state file")
func (p *staticPolicy) Start(ctx context.Context, s state.State) error {
logger := klog.FromContext(ctx)
if err := p.validateState(logger, s); err != nil {
logger.Error(err, "Invalid state, please drain node and remove policy state file")
return err
}
return nil
}
// Allocate call is idempotent
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
// allocate the memory only for guaranteed pods
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "containerName", container.Name)
qos := v1qos.GetPodQOS(pod)
if qos != v1.PodQOSGuaranteed {
klog.V(5).InfoS("Exclusive memory allocation skipped, pod QoS is not guaranteed", "pod", klog.KObj(pod), "containerName", container.Name, "qos", qos)
logger.V(5).Info("Exclusive memory allocation skipped, pod QoS is not guaranteed", "qos", qos)
return nil
}
podUID := string(pod.UID)
klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
logger.Info("Allocate")
// container belongs in an exclusively allocated pool
metrics.MemoryManagerPinningRequestTotal.Inc()
defer func() {
@@ -114,13 +119,13 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
if blocks := s.GetMemoryBlocks(podUID, container.Name); blocks != nil {
p.updatePodReusableMemory(pod, container, blocks)
klog.InfoS("Container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
logger.Info("Container already present in state, skipping")
return nil
}
// Call Topology Manager to get the aligned affinity across all hint providers.
hint := p.affinity.GetAffinity(podUID, container.Name)
klog.InfoS("Got topology affinity", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "hint", hint)
logger.Info("Got topology affinity", "hint", hint)
requestedResources, err := getRequestedResources(pod, container)
if err != nil {
@@ -196,9 +201,9 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
// we only do this so that the sum(memory_for_all_containers) == total amount of allocated memory to the pod, even
// though the final state here doesn't accurately reflect what was (in reality) allocated to each container
// TODO: we should refactor our state structs to reflect the amount of the re-used memory
p.updateInitContainersMemoryBlocks(s, pod, container, containerBlocks)
p.updateInitContainersMemoryBlocks(logger, s, pod, container, containerBlocks)
klog.V(4).InfoS("Allocated exclusive memory", "pod", klog.KObj(pod), "containerName", container.Name)
logger.V(4).Info("Allocated exclusive memory")
return nil
}
@@ -248,13 +253,15 @@ func (p *staticPolicy) getPodReusableMemory(pod *v1.Pod, numaAffinity bitmask.Bi
}
// RemoveContainer call is idempotent
func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) {
func (p *staticPolicy) RemoveContainer(ctx context.Context, s state.State, podUID string, containerName string) {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "podUID", podUID, "containerName", containerName)
blocks := s.GetMemoryBlocks(podUID, containerName)
if blocks == nil {
return
}
klog.InfoS("RemoveContainer", "podUID", podUID, "containerName", containerName)
logger.Info("RemoveContainer", "podUID", podUID, "containerName", containerName)
s.Delete(podUID, containerName)
// Mutate machine memory state to update free and reserved memory
@@ -300,35 +307,35 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
s.SetMachineState(machineState)
}
func regenerateHints(pod *v1.Pod, ctn *v1.Container, ctnBlocks []state.Block, reqRsrc map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
func regenerateHints(logger logr.Logger, pod *v1.Pod, ctn *v1.Container, ctnBlocks []state.Block, reqRsrc map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
hints := map[string][]topologymanager.TopologyHint{}
for resourceName := range reqRsrc {
hints[string(resourceName)] = []topologymanager.TopologyHint{}
}
if len(ctnBlocks) != len(reqRsrc) {
klog.InfoS("The number of requested resources by the container differs from the number of memory blocks", "pod", klog.KObj(pod), "containerName", ctn.Name)
logger.Info("The number of requested resources by the container differs from the number of memory blocks", "containerName", ctn.Name)
return nil
}
for _, b := range ctnBlocks {
if _, ok := reqRsrc[b.Type]; !ok {
klog.InfoS("Container requested resources but none available of this type", "pod", klog.KObj(pod), "containerName", ctn.Name, "type", b.Type)
logger.Info("Container requested resources but none available of this type", "containerName", ctn.Name, "type", b.Type)
return nil
}
if b.Size != reqRsrc[b.Type] {
klog.InfoS("Memory already allocated with different numbers than requested", "pod", klog.KObj(pod), "containerName", ctn.Name, "type", b.Type, "requestedResource", reqRsrc[b.Type], "allocatedSize", b.Size)
logger.Info("Memory already allocated with different numbers than requested", "containerName", ctn.Name, "type", b.Type, "requestedResource", reqRsrc[b.Type], "allocatedSize", b.Size)
return nil
}
containerNUMAAffinity, err := bitmask.NewBitMask(b.NUMAAffinity...)
if err != nil {
klog.ErrorS(err, "Failed to generate NUMA bitmask", "pod", klog.KObj(pod), "containerName", ctn.Name, "type", b.Type)
logger.Error(err, "Failed to generate NUMA bitmask", "containerName", ctn.Name, "type", b.Type)
return nil
}
klog.InfoS("Regenerating TopologyHints, resource was already allocated to pod", "resourceName", b.Type, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", ctn.Name)
logger.Info("Regenerating TopologyHints, resource was already allocated to pod", "resourceName", b.Type, "podUID", pod.UID, "containerName", ctn.Name)
hints[string(b.Type)] = append(hints[string(b.Type)], topologymanager.TopologyHint{
NUMANodeAffinity: containerNUMAAffinity,
Preferred: true,
@@ -392,14 +399,16 @@ func getPodRequestedResources(pod *v1.Pod) (map[v1.ResourceName]uint64, error) {
return reqRsrcs, nil
}
func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KObj(pod))
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
return nil
}
reqRsrcs, err := getPodRequestedResources(pod)
if err != nil {
klog.ErrorS(err, "Failed to get pod requested resources", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.Error(err, "Failed to get pod requested resources", "podUID", pod.UID)
return nil
}
@@ -409,7 +418,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
// memory allocated for the container. This might happen after a
// kubelet restart, for example.
if containerBlocks != nil {
return regenerateHints(pod, &ctn, containerBlocks, reqRsrcs)
return regenerateHints(logger, pod, &ctn, containerBlocks, reqRsrcs)
}
}
@@ -420,14 +429,16 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KObj(pod))
if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
return nil
}
requestedResources, err := getRequestedResources(pod, container)
if err != nil {
klog.ErrorS(err, "Failed to get container requested resources", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
logger.Error(err, "Failed to get container requested resources", "podUID", pod.UID, "containerName", container.Name)
return nil
}
@@ -436,7 +447,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
// memory allocated for the container. This might happen after a
// kubelet restart, for example.
if containerBlocks != nil {
return regenerateHints(pod, container, containerBlocks, requestedResources)
return regenerateHints(logger, pod, container, containerBlocks, requestedResources)
}
return p.calculateHints(s.GetMachineState(), pod, requestedResources)
@@ -588,7 +599,7 @@ func areGroupsEqual(group1, group2 []int) bool {
return true
}
func (p *staticPolicy) validateState(s state.State) error {
func (p *staticPolicy) validateState(logger logr.Logger, s state.State) error {
machineState := s.GetMachineState()
memoryAssignments := s.GetMemoryAssignments()
@@ -654,49 +665,49 @@ func (p *staticPolicy) validateState(s state.State) error {
// Validate that total size, system reserved and reserved memory not changed, it can happen, when:
// - adding or removing physical memory bank from the node
// - change of kubelet system-reserved, kube-reserved or pre-reserved-memory-zone parameters
if !areMachineStatesEqual(machineState, expectedMachineState) {
if !areMachineStatesEqual(logger, machineState, expectedMachineState) {
return fmt.Errorf("[memorymanager] the expected machine state is different from the real one")
}
return nil
}
func areMachineStatesEqual(ms1, ms2 state.NUMANodeMap) bool {
func areMachineStatesEqual(logger logr.Logger, ms1, ms2 state.NUMANodeMap) bool {
if len(ms1) != len(ms2) {
klog.InfoS("Node states were different", "lengthNode1", len(ms1), "lengthNode2", len(ms2))
logger.Info("Node states were different", "lengthNode1", len(ms1), "lengthNode2", len(ms2))
return false
}
for nodeID, nodeState1 := range ms1 {
nodeState2, ok := ms2[nodeID]
if !ok {
klog.InfoS("Node state didn't have node ID", "nodeID", nodeID)
logger.Info("Node state didn't have node ID", "nodeID", nodeID)
return false
}
if nodeState1.NumberOfAssignments != nodeState2.NumberOfAssignments {
klog.InfoS("Node state had a different number of memory assignments.", "assignment1", nodeState1.NumberOfAssignments, "assignment2", nodeState2.NumberOfAssignments)
logger.Info("Node state had a different number of memory assignments.", "assignment1", nodeState1.NumberOfAssignments, "assignment2", nodeState2.NumberOfAssignments)
return false
}
if !areGroupsEqual(nodeState1.Cells, nodeState2.Cells) {
klog.InfoS("Node states had different groups", "stateNode1", nodeState1.Cells, "stateNode2", nodeState2.Cells)
logger.Info("Node states had different groups", "stateNode1", nodeState1.Cells, "stateNode2", nodeState2.Cells)
return false
}
if len(nodeState1.MemoryMap) != len(nodeState2.MemoryMap) {
klog.InfoS("Node state had memory maps of different lengths", "lengthNode1", len(nodeState1.MemoryMap), "lengthNode2", len(nodeState2.MemoryMap))
logger.Info("Node state had memory maps of different lengths", "lengthNode1", len(nodeState1.MemoryMap), "lengthNode2", len(nodeState2.MemoryMap))
return false
}
for resourceName, memoryState1 := range nodeState1.MemoryMap {
memoryState2, ok := nodeState2.MemoryMap[resourceName]
if !ok {
klog.InfoS("Memory state didn't have resource", "resource", resourceName)
logger.Info("Memory state didn't have resource", "resource", resourceName)
return false
}
if !areMemoryStatesEqual(memoryState1, memoryState2, nodeID, resourceName) {
if !areMemoryStatesEqual(logger, memoryState1, memoryState2, nodeID, resourceName) {
return false
}
@@ -710,11 +721,11 @@ func areMachineStatesEqual(ms1, ms2 state.NUMANodeMap) bool {
}
if tmpState1.Free != tmpState2.Free {
klog.InfoS("NUMA node and resource had different memory states", "node", nodeID, "resource", resourceName, "field", "free", "free1", tmpState1.Free, "free2", tmpState2.Free, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
logger.Info("NUMA node and resource had different memory states", "node", nodeID, "resource", resourceName, "field", "free", "free1", tmpState1.Free, "free2", tmpState2.Free, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
return false
}
if tmpState1.Reserved != tmpState2.Reserved {
klog.InfoS("NUMA node and resource had different memory states", "node", nodeID, "resource", resourceName, "field", "reserved", "reserved1", tmpState1.Reserved, "reserved2", tmpState2.Reserved, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
logger.Info("NUMA node and resource had different memory states", "node", nodeID, "resource", resourceName, "field", "reserved", "reserved1", tmpState1.Reserved, "reserved2", tmpState2.Reserved, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
return false
}
}
@@ -722,19 +733,20 @@ func areMachineStatesEqual(ms1, ms2 state.NUMANodeMap) bool {
return true
}
func areMemoryStatesEqual(memoryState1, memoryState2 *state.MemoryTable, nodeID int, resourceName v1.ResourceName) bool {
func areMemoryStatesEqual(logger logr.Logger, memoryState1, memoryState2 *state.MemoryTable, nodeID int, resourceName v1.ResourceName) bool {
loggerWithValues := klog.LoggerWithValues(logger, "node", nodeID, "resource", resourceName, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
if memoryState1.TotalMemSize != memoryState2.TotalMemSize {
klog.InfoS("Memory states for the NUMA node and resource are different", "node", nodeID, "resource", resourceName, "field", "TotalMemSize", "TotalMemSize1", memoryState1.TotalMemSize, "TotalMemSize2", memoryState2.TotalMemSize, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
logger.Info("Memory states for the NUMA node and resource are different", "field", "TotalMemSize", "TotalMemSize1", memoryState1.TotalMemSize, "TotalMemSize2", memoryState2.TotalMemSize)
return false
}
if memoryState1.SystemReserved != memoryState2.SystemReserved {
klog.InfoS("Memory states for the NUMA node and resource are different", "node", nodeID, "resource", resourceName, "field", "SystemReserved", "SystemReserved1", memoryState1.SystemReserved, "SystemReserved2", memoryState2.SystemReserved, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
loggerWithValues.Info("Memory states for the NUMA node and resource are different", "field", "SystemReserved", "SystemReserved1", memoryState1.SystemReserved, "SystemReserved2", memoryState2.SystemReserved)
return false
}
if memoryState1.Allocatable != memoryState2.Allocatable {
klog.InfoS("Memory states for the NUMA node and resource are different", "node", nodeID, "resource", resourceName, "field", "Allocatable", "Allocatable1", memoryState1.Allocatable, "Allocatable2", memoryState2.Allocatable, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
loggerWithValues.Info("Memory states for the NUMA node and resource are different", "field", "Allocatable", "Allocatable1", memoryState1.Allocatable, "Allocatable2", memoryState2.Allocatable)
return false
}
return true
@@ -898,7 +910,7 @@ func findBestHint(hints []topologymanager.TopologyHint) *topologymanager.Topolog
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (p *staticPolicy) GetAllocatableMemory(s state.State) []state.Block {
func (p *staticPolicy) GetAllocatableMemory(_ context.Context, s state.State) []state.Block {
var allocatableMemory []state.Block
machineState := s.GetMachineState()
for numaNodeID, numaNodeState := range machineState {
@@ -962,7 +974,7 @@ func (p *staticPolicy) updatePodReusableMemory(pod *v1.Pod, container *v1.Contai
}
}
func (p *staticPolicy) updateInitContainersMemoryBlocks(s state.State, pod *v1.Pod, container *v1.Container, containerMemoryBlocks []state.Block) {
func (p *staticPolicy) updateInitContainersMemoryBlocks(logger logr.Logger, s state.State, pod *v1.Pod, container *v1.Container, containerMemoryBlocks []state.Block) {
podUID := string(pod.UID)
for _, containerBlock := range containerMemoryBlocks {
@@ -998,7 +1010,7 @@ func (p *staticPolicy) updateInitContainersMemoryBlocks(s state.State, pod *v1.P
continue
}
if !isNUMAAffinitiesEqual(initContainerBlock.NUMAAffinity, containerBlock.NUMAAffinity) {
if !isNUMAAffinitiesEqual(logger, initContainerBlock.NUMAAffinity, containerBlock.NUMAAffinity) {
continue
}
@@ -1026,16 +1038,16 @@ func isRegularInitContainer(pod *v1.Pod, container *v1.Container) bool {
return false
}
func isNUMAAffinitiesEqual(numaAffinity1, numaAffinity2 []int) bool {
func isNUMAAffinitiesEqual(logger logr.Logger, numaAffinity1, numaAffinity2 []int) bool {
bitMask1, err := bitmask.NewBitMask(numaAffinity1...)
if err != nil {
klog.ErrorS(err, "failed to create bit mask", "numaAffinity1", numaAffinity1)
logger.Error(err, "failed to create bit mask", "numaAffinity1", numaAffinity1)
return false
}
bitMask2, err := bitmask.NewBitMask(numaAffinity2...)
if err != nil {
klog.ErrorS(err, "failed to create bit mask", "numaAffinity2", numaAffinity2)
logger.Error(err, "failed to create bit mask", "numaAffinity2", numaAffinity2)
return false
}

View File

@@ -21,8 +21,6 @@ import (
"reflect"
"testing"
"k8s.io/klog/v2"
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/google/go-cmp/cmp"
@@ -31,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/test/utils/ktesting"
)
const (
@@ -135,19 +134,20 @@ type testStaticPolicy struct {
}
func initTests(t *testing.T, testCase *testStaticPolicy, hint *topologymanager.TopologyHint, initContainersReusableMemory reusableMemory) (Policy, state.State, error) {
logger, tCtx := ktesting.NewTestContext(t)
manager := topologymanager.NewFakeManager()
if hint != nil {
manager = topologymanager.NewFakeManagerWithHint(hint)
}
p, err := NewPolicyStatic(testCase.machineInfo, testCase.systemReserved, manager)
p, err := NewPolicyStatic(tCtx, testCase.machineInfo, testCase.systemReserved, manager)
if err != nil {
return nil, nil, err
}
if initContainersReusableMemory != nil {
p.(*staticPolicy).initContainersReusableMemory = initContainersReusableMemory
}
s := state.NewMemoryState()
s := state.NewMemoryState(logger)
s.SetMachineState(testCase.machineState)
s.SetMemoryAssignments(testCase.assignments)
return p, s, nil
@@ -213,6 +213,7 @@ func TestStaticPolicyName(t *testing.T) {
}
func TestStaticPolicyStart(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
testCases := []testStaticPolicy{
{
description: "should fail, if machine state is empty, but it has memory assignments",
@@ -1153,7 +1154,7 @@ func TestStaticPolicyStart(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
err = p.Start(s)
err = p.Start(tCtx, s)
if !reflect.DeepEqual(err, testCase.expectedError) {
t.Fatalf("The actual error: %v is different from the expected one: %v", err, testCase.expectedError)
}
@@ -1168,7 +1169,7 @@ func TestStaticPolicyStart(t *testing.T) {
}
machineState := s.GetMachineState()
if !areMachineStatesEqual(machineState, testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) {
t.Fatalf("The actual machine state: %v is different from the expected one: %v", machineState, testCase.expectedMachineState)
}
})
@@ -1176,6 +1177,7 @@ func TestStaticPolicyStart(t *testing.T) {
}
func TestStaticPolicyAllocate(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
testCases := []testStaticPolicy{
{
description: "should do nothing for non-guaranteed pods",
@@ -2014,7 +2016,7 @@ func TestStaticPolicyAllocate(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.Containers[0])
err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0])
if !reflect.DeepEqual(err, testCase.expectedError) {
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
}
@@ -2029,7 +2031,7 @@ func TestStaticPolicyAllocate(t *testing.T) {
}
machineState := s.GetMachineState()
if !areMachineStatesEqual(machineState, testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) {
t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState)
}
})
@@ -2037,6 +2039,7 @@ func TestStaticPolicyAllocate(t *testing.T) {
}
func TestStaticPolicyAllocateWithInitContainers(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
testCases := []testStaticPolicy{
{
description: "should re-use init containers memory, init containers requests 1Gi and 2Gi, apps containers 3Gi and 4Gi",
@@ -2731,21 +2734,21 @@ func TestStaticPolicyAllocateWithInitContainers(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
klog.InfoS("TestStaticPolicyAllocateWithInitContainers", "name", testCase.description)
logger.Info("TestStaticPolicyAllocateWithInitContainers", "name", testCase.description)
p, s, err := initTests(t, &testCase, testCase.topologyHint, testCase.initContainersReusableMemory)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := range testCase.pod.Spec.InitContainers {
err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.InitContainers[i])
err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.InitContainers[i])
if !reflect.DeepEqual(err, testCase.expectedError) {
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
}
}
for i := range testCase.pod.Spec.Containers {
err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.Containers[i])
err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[i])
if !reflect.DeepEqual(err, testCase.expectedError) {
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
}
@@ -2757,7 +2760,7 @@ func TestStaticPolicyAllocateWithInitContainers(t *testing.T) {
}
machineState := s.GetMachineState()
if !areMachineStatesEqual(machineState, testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) {
t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState)
}
})
@@ -2765,6 +2768,7 @@ func TestStaticPolicyAllocateWithInitContainers(t *testing.T) {
}
func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
testCases := []testStaticPolicy{
{
description: "should do nothing once containers already exist under the state file",
@@ -3064,14 +3068,14 @@ func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
klog.InfoS("TestStaticPolicyAllocateWithRestartableInitContainers", "name", testCase.description)
logger.Info("TestStaticPolicyAllocateWithRestartableInitContainers", "name", testCase.description)
p, s, err := initTests(t, &testCase, testCase.topologyHint, testCase.initContainersReusableMemory)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := range testCase.pod.Spec.InitContainers {
err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.InitContainers[i])
err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.InitContainers[i])
if !reflect.DeepEqual(err, testCase.expectedError) {
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
}
@@ -3082,7 +3086,7 @@ func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) {
}
for i := range testCase.pod.Spec.Containers {
err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.Containers[i])
err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[i])
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -3094,7 +3098,7 @@ func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) {
}
machineState := s.GetMachineState()
if !areMachineStatesEqual(machineState, testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) {
t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState)
}
})
@@ -3102,6 +3106,7 @@ func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) {
}
func TestStaticPolicyRemoveContainer(t *testing.T) {
logger, tCtx := ktesting.NewTestContext(t)
testCases := []testStaticPolicy{
{
description: "should do nothing when the container does not exist under the state",
@@ -3344,14 +3349,14 @@ func TestStaticPolicyRemoveContainer(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
p.RemoveContainer(s, "pod1", "container1")
p.RemoveContainer(tCtx, s, "pod1", "container1")
assignments := s.GetMemoryAssignments()
if !areContainerMemoryAssignmentsEqual(t, assignments, testCase.expectedAssignments) {
t.Fatalf("Actual assignments %v are different from the expected %v", assignments, testCase.expectedAssignments)
}
machineState := s.GetMachineState()
if !areMachineStatesEqual(machineState, testCase.expectedMachineState) {
if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) {
t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState)
}
})
@@ -3359,6 +3364,7 @@ func TestStaticPolicyRemoveContainer(t *testing.T) {
}
func TestStaticPolicyGetTopologyHints(t *testing.T) {
tCtx := ktesting.Init(t)
testCases := []testStaticPolicy{
{
description: "should not provide topology hints for non-guaranteed pods",
@@ -3735,7 +3741,7 @@ func TestStaticPolicyGetTopologyHints(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
topologyHints := p.GetTopologyHints(s, testCase.pod, &testCase.pod.Spec.Containers[0])
topologyHints := p.GetTopologyHints(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0])
if !reflect.DeepEqual(topologyHints, testCase.expectedTopologyHints) {
t.Fatalf("The actual topology hints: '%+v' are different from the expected one: '%+v'", topologyHints, testCase.expectedTopologyHints)
}

View File

@@ -30,6 +30,7 @@ var _ State = &stateCheckpoint{}
type stateCheckpoint struct {
sync.RWMutex
logger klog.Logger
cache State
policyName string
checkpointManager checkpointmanager.CheckpointManager
@@ -37,13 +38,15 @@ type stateCheckpoint struct {
}
// NewCheckpointState creates new State for keeping track of memory/pod assignment with checkpoint backend
func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) {
func NewCheckpointState(logger klog.Logger, stateDir, checkpointName, policyName string) (State, error) {
logger = klog.LoggerWithName(logger, "Memory Manager state checkpoint")
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
stateCheckpoint := &stateCheckpoint{
cache: NewMemoryState(),
logger: logger,
cache: NewMemoryState(logger),
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
@@ -79,7 +82,7 @@ func (sc *stateCheckpoint) restoreState() error {
sc.cache.SetMachineState(checkpoint.MachineState)
sc.cache.SetMemoryAssignments(checkpoint.Entries)
klog.V(2).InfoS("State checkpoint: restored state from checkpoint")
sc.logger.V(2).Info("State checkpoint: restored state from checkpoint")
return nil
}
@@ -93,7 +96,7 @@ func (sc *stateCheckpoint) storeState() error {
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
klog.ErrorS(err, "Could not save checkpoint")
sc.logger.Error(err, "Could not save checkpoint")
return err
}
return nil
@@ -131,7 +134,7 @@ func (sc *stateCheckpoint) SetMachineState(memoryMap NUMANodeMap) {
sc.cache.SetMachineState(memoryMap)
err := sc.storeState()
if err != nil {
klog.ErrorS(err, "Failed to store state to checkpoint")
sc.logger.Error(err, "Failed to store state to checkpoint")
}
}
@@ -143,7 +146,7 @@ func (sc *stateCheckpoint) SetMemoryBlocks(podUID string, containerName string,
sc.cache.SetMemoryBlocks(podUID, containerName, blocks)
err := sc.storeState()
if err != nil {
klog.ErrorS(err, "Failed to store state to checkpoint", "podUID", podUID, "containerName", containerName)
sc.logger.Error(err, "Failed to store state to checkpoint", "podUID", podUID, "containerName", containerName)
}
}
@@ -155,7 +158,7 @@ func (sc *stateCheckpoint) SetMemoryAssignments(assignments ContainerMemoryAssig
sc.cache.SetMemoryAssignments(assignments)
err := sc.storeState()
if err != nil {
klog.ErrorS(err, "Failed to store state to checkpoint")
sc.logger.Error(err, "Failed to store state to checkpoint")
}
}
@@ -167,7 +170,7 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
sc.cache.Delete(podUID, containerName)
err := sc.storeState()
if err != nil {
klog.ErrorS(err, "Failed to store state to checkpoint", "podUID", podUID, "containerName", containerName)
sc.logger.Error(err, "Failed to store state to checkpoint", "podUID", podUID, "containerName", containerName)
}
}
@@ -179,6 +182,6 @@ func (sc *stateCheckpoint) ClearState() {
sc.cache.ClearState()
err := sc.storeState()
if err != nil {
klog.ErrorS(err, "Failed to store state to checkpoint")
sc.logger.Error(err, "Failed to store state to checkpoint")
}
}

View File

@@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
"k8s.io/kubernetes/test/utils/ktesting"
)
const testingCheckpoint = "memorymanager_checkpoint_test"
@@ -42,6 +43,7 @@ func assertStateEqual(t *testing.T, restoredState, expectedState State) {
}
func TestCheckpointStateRestore(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testCases := []struct {
description string
checkpointContent string
@@ -131,7 +133,7 @@ func TestCheckpointStateRestore(t *testing.T) {
assert.NoError(t, cpm.CreateCheckpoint(testingCheckpoint, checkpoint), "could not create testing checkpoint")
}
restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
restoredState, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static")
if strings.TrimSpace(tc.expectedError) != "" {
assert.Error(t, err)
assert.ErrorContains(t, err, "could not restore state from checkpoint: "+tc.expectedError)
@@ -145,6 +147,7 @@ func TestCheckpointStateRestore(t *testing.T) {
}
func TestCheckpointStateStore(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
expectedState := &stateMemory{
assignments: ContainerMemoryAssignments{
"pod": map[string][]Block{
@@ -184,7 +187,7 @@ func TestCheckpointStateStore(t *testing.T) {
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
cs1, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static")
assert.NoError(t, err, "could not create testing checkpointState instance")
// set values of cs1 instance so they are stored in checkpoint and can be read by cs2
@@ -192,13 +195,14 @@ func TestCheckpointStateStore(t *testing.T) {
cs1.SetMemoryAssignments(expectedState.assignments)
// restore checkpoint with previously stored values
cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
cs2, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static")
assert.NoError(t, err, "could not create testing checkpointState instance")
assertStateEqual(t, cs2, expectedState)
}
func TestCheckpointStateHelpers(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testCases := []struct {
description string
machineState NUMANodeMap
@@ -306,7 +310,7 @@ func TestCheckpointStateHelpers(t *testing.T) {
// ensure there is no previous checkpoint
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
state, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
state, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static")
assert.NoError(t, err, "could not create testing checkpoint manager")
state.SetMachineState(tc.machineState)
@@ -326,6 +330,7 @@ func TestCheckpointStateHelpers(t *testing.T) {
}
func TestCheckpointStateClear(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
testCases := []struct {
description string
machineState NUMANodeMap
@@ -369,7 +374,7 @@ func TestCheckpointStateClear(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
state, err := NewCheckpointState(testingDir, testingCheckpoint, "static")
state, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static")
assert.NoError(t, err, "could not create testing checkpoint manager")
state.SetMachineState(tc.machineState)

View File

@@ -24,6 +24,7 @@ import (
type stateMemory struct {
sync.RWMutex
logger klog.Logger
assignments ContainerMemoryAssignments
machineState NUMANodeMap
}
@@ -31,9 +32,10 @@ type stateMemory struct {
var _ State = &stateMemory{}
// NewMemoryState creates new State for keeping track of cpu/pod assignment
func NewMemoryState() State {
klog.InfoS("Initializing new in-memory state store")
func NewMemoryState(logger klog.Logger) State {
logger.Info("Initializing new in-memory state store")
return &stateMemory{
logger: logger,
assignments: ContainerMemoryAssignments{},
machineState: NUMANodeMap{},
}
@@ -72,7 +74,7 @@ func (s *stateMemory) SetMachineState(nodeMap NUMANodeMap) {
defer s.Unlock()
s.machineState = nodeMap.Clone()
klog.InfoS("Updated machine memory state")
s.logger.Info("Updated machine memory state")
}
// SetMemoryBlocks stores memory assignments of container
@@ -85,7 +87,7 @@ func (s *stateMemory) SetMemoryBlocks(podUID string, containerName string, block
}
s.assignments[podUID][containerName] = append([]Block{}, blocks...)
klog.InfoS("Updated memory state", "podUID", podUID, "containerName", containerName)
s.logger.Info("Updated memory state", "podUID", podUID, "containerName", containerName)
}
// SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter
@@ -94,7 +96,7 @@ func (s *stateMemory) SetMemoryAssignments(assignments ContainerMemoryAssignment
defer s.Unlock()
s.assignments = assignments.Clone()
klog.V(5).InfoS("Updated Memory assignments", "assignments", assignments)
s.logger.V(5).Info("Updated Memory assignments", "assignments", assignments)
}
// Delete deletes corresponding Blocks from ContainerMemoryAssignments
@@ -110,7 +112,7 @@ func (s *stateMemory) Delete(podUID string, containerName string) {
if len(s.assignments[podUID]) == 0 {
delete(s.assignments, podUID)
}
klog.V(2).InfoS("Deleted memory assignment", "podUID", podUID, "containerName", containerName)
s.logger.V(2).Info("Deleted memory assignment", "podUID", podUID, "containerName", containerName)
}
// ClearState clears machineState and ContainerMemoryAssignments
@@ -120,5 +122,5 @@ func (s *stateMemory) ClearState() {
s.machineState = NUMANodeMap{}
s.assignments = make(ContainerMemoryAssignments)
klog.V(2).InfoS("Cleared state")
s.logger.V(2).Info("Cleared state")
}