From b8758ac31b76fdc6fb3c01f7177c17ddce090aca Mon Sep 17 00:00:00 2001 From: Swati Sehgal Date: Tue, 11 Mar 2025 13:41:46 +0000 Subject: [PATCH] node: mm-mgr: migrate to contextual logging Signed-off-by: Swati Sehgal --- hack/golangci-hints.yaml | 1 + hack/golangci.yaml | 1 + hack/logcheck.conf | 1 + pkg/kubelet/cm/container_manager_linux.go | 11 +- pkg/kubelet/cm/container_manager_stub.go | 6 +- pkg/kubelet/cm/container_manager_windows.go | 5 +- pkg/kubelet/cm/fake_container_manager.go | 4 +- .../cm/internal_container_lifecycle.go | 6 +- .../cm/internal_container_lifecycle_linux.go | 3 +- .../internal_container_lifecycle_windows.go | 3 +- .../cm/memorymanager/fake_memory_manager.go | 57 ++++++---- .../cm/memorymanager/memory_manager.go | 86 ++++++++------ .../cm/memorymanager/memory_manager_test.go | 69 ++++++------ pkg/kubelet/cm/memorymanager/policy.go | 14 ++- .../cm/memorymanager/policy_best_effort.go | 30 ++--- pkg/kubelet/cm/memorymanager/policy_none.go | 19 ++-- pkg/kubelet/cm/memorymanager/policy_static.go | 106 ++++++++++-------- .../cm/memorymanager/policy_static_test.go | 44 ++++---- .../memorymanager/state/state_checkpoint.go | 21 ++-- .../state/state_checkpoint_test.go | 15 ++- .../cm/memorymanager/state/state_mem.go | 16 +-- 21 files changed, 303 insertions(+), 215 deletions(-) diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index 5ade8f9950d..a910ecee54b 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -204,6 +204,7 @@ linters: contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.* + contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.* contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.* diff --git a/hack/golangci.yaml b/hack/golangci.yaml index 0aedb239002..722b69ead88 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -218,6 +218,7 @@ linters: contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.* + contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.* contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.* diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 10bde408ce8..abac0b6f1ba 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -50,6 +50,7 @@ contextual k8s.io/kubernetes/pkg/controller/.* contextual k8s.io/kubernetes/pkg/scheduler/.* contextual k8s.io/kubernetes/test/e2e/dra/.* contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.* +contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.* contextual k8s.io/kubernetes/pkg/kubelet/pleg/.* contextual k8s.io/kubernetes/pkg/kubelet/clustertrustbundle/.* contextual k8s.io/kubernetes/pkg/kubelet/token/.* diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 2b51add2895..74636398701 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -337,6 +337,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I cm.topologyManager.AddHintProvider(cm.cpuManager) cm.memoryManager, err = memorymanager.NewManager( + context.TODO(), nodeConfig.MemoryManagerPolicy, machineInfo, cm.GetNodeAllocatableReservation(), @@ -591,7 +592,7 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, } // Initialize memory manager - err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone()) + err = cm.memoryManager.Start(ctx, memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone()) if err != nil { return fmt.Errorf("start memory manager error: %w", err) } @@ -954,7 +955,9 @@ func (cm *containerManagerImpl) GetMemory(podUID, containerName string) []*podre return []*podresourcesapi.ContainerMemory{} } - return containerMemoryFromBlock(cm.memoryManager.GetMemory(podUID, containerName)) + // This is tempporary as part of migration of memory manager to Contextual logging. + // Direct context to be passed when container manager is migrated. + return containerMemoryFromBlock(cm.memoryManager.GetMemory(context.TODO(), podUID, containerName)) } func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory { @@ -962,7 +965,9 @@ func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.Contai return []*podresourcesapi.ContainerMemory{} } - return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory()) + // This is tempporary as part of migration of memory manager to Contextual logging. + // Direct context to be passed when container manager is migrated. + return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory(context.TODO())) } func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource { diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 429ccf0b495..f17e7e77ff6 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -43,12 +43,14 @@ import ( type containerManagerStub struct { shouldResetExtendedResourceCapacity bool extendedPluginResources v1.ResourceList + memoryManager memorymanager.Manager } var _ ContainerManager = &containerManagerStub{} -func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { +func (cm *containerManagerStub) Start(ctx context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { klog.V(2).InfoS("Starting stub container manager") + cm.memoryManager = memorymanager.NewFakeManager(ctx) return nil } @@ -125,7 +127,7 @@ func (cm *containerManagerStub) UpdatePluginResources(*schedulerframework.NodeIn } func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle { - return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()} + return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), cm.memoryManager, topologymanager.NewFakeManager()} } func (cm *containerManagerStub) GetPodCgroupRoot() string { diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index d3e080614fd..41f977168da 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -103,7 +103,7 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, } // Initialize memory manager - err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone()) + err = cm.memoryManager.Start(ctx, memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone()) if err != nil { return fmt.Errorf("start memory manager error: %v", err) } @@ -136,7 +136,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I cm.topologyManager = topologymanager.NewFakeManager() cm.cpuManager = cpumanager.NewFakeManager() - cm.memoryManager = memorymanager.NewFakeManager() + cm.memoryManager = memorymanager.NewFakeManager(context.TODO()) if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) { klog.InfoS("Creating topology manager") @@ -168,6 +168,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I klog.InfoS("Creating memory manager") cm.memoryManager, err = memorymanager.NewManager( + context.TODO(), nodeConfig.MemoryManagerPolicy, machineInfo, cm.GetNodeAllocatableReservation(), diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go index ed23869fb9e..0c76401885b 100644 --- a/pkg/kubelet/cm/fake_container_manager.go +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -45,6 +45,7 @@ type FakeContainerManager struct { PodContainerManager *FakePodContainerManager shouldResetExtendedResourceCapacity bool nodeConfig NodeConfig + memoryManager memorymanager.Manager } var _ ContainerManager = &FakeContainerManager{} @@ -52,6 +53,7 @@ var _ ContainerManager = &FakeContainerManager{} func NewFakeContainerManager() *FakeContainerManager { return &FakeContainerManager{ PodContainerManager: NewFakePodContainerManager(), + memoryManager: memorymanager.NewFakeManager(context.TODO()), } } @@ -179,7 +181,7 @@ func (cm *FakeContainerManager) InternalContainerLifecycle() InternalContainerLi cm.Lock() defer cm.Unlock() cm.CalledFunctions = append(cm.CalledFunctions, "InternalContainerLifecycle") - return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()} + return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), cm.memoryManager, topologymanager.NewFakeManager()} } func (cm *FakeContainerManager) GetPodCgroupRoot() string { diff --git a/pkg/kubelet/cm/internal_container_lifecycle.go b/pkg/kubelet/cm/internal_container_lifecycle.go index 5e50fd16662..068022051ac 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle.go +++ b/pkg/kubelet/cm/internal_container_lifecycle.go @@ -17,7 +17,9 @@ limitations under the License. package cm import ( - "k8s.io/api/core/v1" + "context" + + v1 "k8s.io/api/core/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" @@ -43,7 +45,7 @@ func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, containe } if i.memoryManager != nil { - i.memoryManager.AddContainer(pod, container, containerID) + i.memoryManager.AddContainer(context.TODO(), pod, container, containerID) } i.topologyManager.AddContainer(pod, container, containerID) diff --git a/pkg/kubelet/cm/internal_container_lifecycle_linux.go b/pkg/kubelet/cm/internal_container_lifecycle_linux.go index 0c3bb2e4999..43e2717ccdd 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle_linux.go +++ b/pkg/kubelet/cm/internal_container_lifecycle_linux.go @@ -20,6 +20,7 @@ limitations under the License. package cm import ( + "context" "strconv" "strings" @@ -37,7 +38,7 @@ func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, contain } if i.memoryManager != nil { - numaNodes := i.memoryManager.GetMemoryNUMANodes(pod, container) + numaNodes := i.memoryManager.GetMemoryNUMANodes(context.TODO(), pod, container) if numaNodes.Len() > 0 { var affinity []string for _, numaNode := range sets.List(numaNodes) { diff --git a/pkg/kubelet/cm/internal_container_lifecycle_windows.go b/pkg/kubelet/cm/internal_container_lifecycle_windows.go index 939658d846f..35c7f48f95d 100644 --- a/pkg/kubelet/cm/internal_container_lifecycle_windows.go +++ b/pkg/kubelet/cm/internal_container_lifecycle_windows.go @@ -20,6 +20,7 @@ limitations under the License. package cm import ( + "context" "fmt" v1 "k8s.io/api/core/v1" @@ -47,7 +48,7 @@ func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, contain var numaNodes sets.Set[int] if i.memoryManager != nil { - numaNodes = i.memoryManager.GetMemoryNUMANodes(pod, container) + numaNodes = i.memoryManager.GetMemoryNUMANodes(context.TODO(), pod, container) } // Gather all CPUs associated with the selected NUMA nodes diff --git a/pkg/kubelet/cm/memorymanager/fake_memory_manager.go b/pkg/kubelet/cm/memorymanager/fake_memory_manager.go index 46874e50050..6664730abcc 100644 --- a/pkg/kubelet/cm/memorymanager/fake_memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/fake_memory_manager.go @@ -17,6 +17,8 @@ limitations under the License. package memorymanager import ( + "context" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -31,42 +33,53 @@ type fakeManager struct { state state.State } -func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error { - klog.InfoS("Start()") +func (m *fakeManager) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error { + logger := klog.FromContext(ctx) + logger.Info("Start()") return nil } -func (m *fakeManager) Policy() Policy { - klog.InfoS("Policy()") - return NewPolicyNone() +func (m *fakeManager) Policy(ctx context.Context) Policy { + logger := klog.FromContext(ctx) + logger.Info("Policy()") + return NewPolicyNone(ctx) } func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error { - klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name) + ctx := context.TODO() + logger := klog.FromContext(ctx) + logger.Info("Allocate", "pod", klog.KObj(pod), "containerName", container.Name) return nil } -func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) { - klog.InfoS("Add container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID) +func (m *fakeManager) AddContainer(ctx context.Context, pod *v1.Pod, container *v1.Container, containerID string) { + logger := klog.FromContext(ctx) + logger.Info("Add container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID) } -func (m *fakeManager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int] { - klog.InfoS("Get MemoryNUMANodes", "pod", klog.KObj(pod), "containerName", container.Name) +func (m *fakeManager) GetMemoryNUMANodes(ctx context.Context, pod *v1.Pod, container *v1.Container) sets.Set[int] { + logger := klog.FromContext(ctx) + logger.Info("Get MemoryNUMANodes", "pod", klog.KObj(pod), "containerName", container.Name) return nil } -func (m *fakeManager) RemoveContainer(containerID string) error { - klog.InfoS("RemoveContainer", "containerID", containerID) +func (m *fakeManager) RemoveContainer(ctx context.Context, containerID string) error { + logger := klog.FromContext(ctx) + logger.Info("RemoveContainer", "containerID", containerID) return nil } func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { - klog.InfoS("Get Topology Hints", "pod", klog.KObj(pod), "containerName", container.Name) + ctx := context.TODO() + logger := klog.FromContext(ctx) + logger.Info("Get Topology Hints", "pod", klog.KObj(pod), "containerName", container.Name) return map[string][]topologymanager.TopologyHint{} } func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { - klog.InfoS("Get Pod Topology Hints", "pod", klog.KObj(pod)) + ctx := context.TODO() + logger := klog.FromContext(ctx) + logger.Info("Get Pod Topology Hints", "pod", klog.KObj(pod)) return map[string][]topologymanager.TopologyHint{} } @@ -75,20 +88,24 @@ func (m *fakeManager) State() state.Reader { } // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node -func (m *fakeManager) GetAllocatableMemory() []state.Block { - klog.InfoS("Get Allocatable Memory") +func (m *fakeManager) GetAllocatableMemory(ctx context.Context) []state.Block { + logger := klog.FromContext(ctx) + logger.Info("Get Allocatable Memory") return []state.Block{} } // GetMemory returns the memory allocated by a container from NUMA nodes -func (m *fakeManager) GetMemory(podUID, containerName string) []state.Block { - klog.InfoS("Get Memory", "podUID", podUID, "containerName", containerName) +func (m *fakeManager) GetMemory(ctx context.Context, podUID, containerName string) []state.Block { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "podUID", podUID, "containerName", containerName) + logger.Info("Get Memory") return []state.Block{} } // NewFakeManager creates empty/fake memory manager -func NewFakeManager() Manager { +func NewFakeManager(ctx context.Context) Manager { + logger := klog.LoggerWithName(klog.FromContext(ctx), "memory-mgr.fake") return &fakeManager{ - state: state.NewMemoryState(), + // logger: logger, + state: state.NewMemoryState(logger), } } diff --git a/pkg/kubelet/cm/memorymanager/memory_manager.go b/pkg/kubelet/cm/memorymanager/memory_manager.go index 0e42d9eb86a..572f58599a0 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager.go @@ -57,11 +57,11 @@ func (s *sourcesReadyStub) AllReady() bool { return true } // Manager interface provides methods for Kubelet to manage pod memory. type Manager interface { // Start is called during Kubelet initialization. - Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error + Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error // AddContainer adds the mapping between container ID to pod UID and the container name // The mapping used to remove the memory allocation during the container removal - AddContainer(p *v1.Pod, c *v1.Container, containerID string) + AddContainer(ctx context.Context, p *v1.Pod, c *v1.Container, containerID string) // Allocate is called to pre-allocate memory resources during Pod admission. // This must be called at some point prior to the AddContainer() call for a container, e.g. at pod admission time. @@ -69,7 +69,7 @@ type Manager interface { // RemoveContainer is called after Kubelet decides to kill or delete a // container. After this call, any memory allocated to the container is freed. - RemoveContainer(containerID string) error + RemoveContainer(ctx context.Context, containerID string) error // State returns a read-only interface to the internal memory manager state. State() state.Reader @@ -85,13 +85,13 @@ type Manager interface { GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint // GetMemoryNUMANodes provides NUMA nodes that are used to allocate the container memory - GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int] + GetMemoryNUMANodes(ctx context.Context, pod *v1.Pod, container *v1.Container) sets.Set[int] // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node - GetAllocatableMemory() []state.Block + GetAllocatableMemory(ctx context.Context) []state.Block // GetMemory returns the memory allocated by a container from NUMA nodes - GetMemory(podUID, containerName string) []state.Block + GetMemory(ctx context.Context, podUID, containerName string) []state.Block } type manager struct { @@ -132,13 +132,13 @@ type manager struct { var _ Manager = &manager{} // NewManager returns new instance of the memory manager -func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { +func NewManager(ctx context.Context, policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) { var policy Policy switch policyType(policyName) { case policyTypeNone: - policy = NewPolicyNone() + policy = NewPolicyNone(ctx) case PolicyTypeStatic: if runtime.GOOS == "windows" { @@ -150,7 +150,7 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll return nil, err } - policy, err = NewPolicyStatic(machineInfo, systemReserved, affinity) + policy, err = NewPolicyStatic(ctx, machineInfo, systemReserved, affinity) if err != nil { return nil, err } @@ -161,7 +161,7 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll if err != nil { return nil, err } - policy, err = NewPolicyBestEffort(machineInfo, systemReserved, affinity) + policy, err = NewPolicyBestEffort(ctx, machineInfo, systemReserved, affinity) if err != nil { return nil, err } @@ -182,35 +182,36 @@ func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAll } // Start starts the memory manager under the kubelet and calls policy start -func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error { - klog.InfoS("Starting memorymanager", "policy", m.policy.Name()) +func (m *manager) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error { + logger := klog.FromContext(ctx) + logger.Info("Starting memorymanager", "policy", m.policy.Name()) m.sourcesReady = sourcesReady m.activePods = activePods m.podStatusProvider = podStatusProvider m.containerRuntime = containerRuntime m.containerMap = initialContainers - stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, memoryManagerStateFileName, m.policy.Name()) + stateImpl, err := state.NewCheckpointState(logger, m.stateFileDirectory, memoryManagerStateFileName, m.policy.Name()) if err != nil { - klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file") + logger.Error(err, "Could not initialize checkpoint manager, please drain node and remove policy state file") return err } m.state = stateImpl - err = m.policy.Start(m.state) + err = m.policy.Start(ctx, m.state) if err != nil { - klog.ErrorS(err, "Policy start error") + logger.Error(err, "Policy start error") return err } - m.allocatableMemory = m.policy.GetAllocatableMemory(m.state) + m.allocatableMemory = m.policy.GetAllocatableMemory(ctx, m.state) - klog.V(4).InfoS("memorymanager started", "policy", m.policy.Name()) + logger.V(4).Info("memorymanager started", "policy", m.policy.Name()) return nil } // AddContainer saves the value of requested memory for the guaranteed pod under the state and set memory affinity according to the topolgy manager -func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) { +func (m *manager) AddContainer(ctx context.Context, pod *v1.Pod, container *v1.Container, containerID string) { m.Lock() defer m.Unlock() @@ -238,7 +239,9 @@ func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID } // GetMemoryNUMANodes provides NUMA nodes that used to allocate the container memory -func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int] { +func (m *manager) GetMemoryNUMANodes(ctx context.Context, pod *v1.Pod, container *v1.Container) sets.Set[int] { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KObj(pod), "containerName", container.Name) + // Get NUMA node affinity of blocks assigned to the container during Allocate() numaNodes := sets.New[int]() for _, block := range m.state.GetMemoryBlocks(string(pod.UID), container.Name) { @@ -249,39 +252,44 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets. } if numaNodes.Len() == 0 { - klog.V(5).InfoS("NUMA nodes not available for allocation", "pod", klog.KObj(pod), "containerName", container.Name) + logger.V(5).Info("NUMA nodes not available for allocation") return nil } - klog.InfoS("Memory affinity", "pod", klog.KObj(pod), "containerName", container.Name, "numaNodes", numaNodes) + logger.Info("Memory affinity", "numaNodes", numaNodes) return numaNodes } // Allocate is called to pre-allocate memory resources during Pod admission. func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error { // Garbage collect any stranded resources before allocation - m.removeStaleState() + ctx := context.TODO() + logger := klog.FromContext(ctx) + + m.removeStaleState(logger) m.Lock() defer m.Unlock() // Call down into the policy to assign this container memory if required. - if err := m.policy.Allocate(m.state, pod, container); err != nil { - klog.ErrorS(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name) + if err := m.policy.Allocate(ctx, m.state, pod, container); err != nil { + logger.Error(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name) return err } return nil } // RemoveContainer removes the container from the state -func (m *manager) RemoveContainer(containerID string) error { +func (m *manager) RemoveContainer(ctx context.Context, containerID string) error { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "containerID", containerID) + m.Lock() defer m.Unlock() // if error appears it means container entry already does not exist under the container map podUID, containerName, err := m.containerMap.GetContainerRef(containerID) if err != nil { - klog.ErrorS(err, "Failed to get container from container map", "containerID", containerID) + logger.Error(err, "Failed to get container from container map") return nil } @@ -297,22 +305,26 @@ func (m *manager) State() state.Reader { // GetPodTopologyHints returns the topology hints for the topology manager func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint { + // Use context.TODO() because we currently do not have a proper context to pass in. + // This should be replaced with an appropriate context when refactoring this function to accept a context parameter. + ctx := context.TODO() // Garbage collect any stranded resources before providing TopologyHints - m.removeStaleState() + m.removeStaleState(klog.FromContext(ctx)) // Delegate to active policy - return m.policy.GetPodTopologyHints(m.state, pod) + return m.policy.GetPodTopologyHints(context.TODO(), m.state, pod) } // GetTopologyHints returns the topology hints for the topology manager func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { // Garbage collect any stranded resources before providing TopologyHints - m.removeStaleState() + m.removeStaleState(klog.TODO()) // Delegate to active policy - return m.policy.GetTopologyHints(m.state, pod, container) + return m.policy.GetTopologyHints(context.TODO(), m.state, pod, container) } // TODO: move the method to the upper level, to re-use it under the CPU and memory managers -func (m *manager) removeStaleState() { +func (m *manager) removeStaleState(logger klog.Logger) { + // Only once all sources are ready do we attempt to remove any stale state. // This ensures that the call to `m.activePods()` below will succeed with // the actual active pods list. @@ -345,7 +357,7 @@ func (m *manager) removeStaleState() { for podUID := range assignments { for containerName := range assignments[podUID] { if _, ok := activeContainers[podUID][containerName]; !ok { - klog.V(2).InfoS("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName) + logger.V(2).Info("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName) m.policyRemoveContainerByRef(podUID, containerName) } } @@ -353,14 +365,14 @@ func (m *manager) removeStaleState() { m.containerMap.Visit(func(podUID, containerName, containerID string) { if _, ok := activeContainers[podUID][containerName]; !ok { - klog.V(2).InfoS("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName) + logger.V(2).Info("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName) m.policyRemoveContainerByRef(podUID, containerName) } }) } func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) { - m.policy.RemoveContainer(m.state, podUID, containerName) + m.policy.RemoveContainer(context.TODO(), m.state, podUID, containerName) m.containerMap.RemoveByContainerRef(podUID, containerName) } @@ -458,11 +470,11 @@ func getSystemReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatab } // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node -func (m *manager) GetAllocatableMemory() []state.Block { +func (m *manager) GetAllocatableMemory(ctx context.Context) []state.Block { return m.allocatableMemory } // GetMemory returns the memory allocated by a container from NUMA nodes -func (m *manager) GetMemory(podUID, containerName string) []state.Block { +func (m *manager) GetMemory(ctx context.Context, podUID, containerName string) []state.Block { return m.state.GetMemoryBlocks(podUID, containerName) } diff --git a/pkg/kubelet/cm/memorymanager/memory_manager_test.go b/pkg/kubelet/cm/memorymanager/memory_manager_test.go index bb7d970cf3c..8bcfa3fd3d7 100644 --- a/pkg/kubelet/cm/memorymanager/memory_manager_test.go +++ b/pkg/kubelet/cm/memorymanager/memory_manager_test.go @@ -25,8 +25,6 @@ import ( "strings" "testing" - "k8s.io/klog/v2" - cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" @@ -39,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -72,17 +71,17 @@ type testMemoryManager struct { activePods []*v1.Pod } -func returnPolicyByName(testCase testMemoryManager) Policy { +func returnPolicyByName(ctx context.Context, testCase testMemoryManager) Policy { switch testCase.policyName { case policyTypeMock: return &mockPolicy{ err: fmt.Errorf("fake reg error"), } case PolicyTypeStatic: - policy, _ := NewPolicyStatic(&testCase.machineInfo, testCase.reserved, topologymanager.NewFakeManager()) + policy, _ := NewPolicyStatic(ctx, &testCase.machineInfo, testCase.reserved, topologymanager.NewFakeManager()) return policy case policyTypeNone: - return NewPolicyNone() + return NewPolicyNone(ctx) } return nil } @@ -95,27 +94,27 @@ func (p *mockPolicy) Name() string { return string(policyTypeMock) } -func (p *mockPolicy) Start(s state.State) error { +func (p *mockPolicy) Start(context.Context, state.State) error { return p.err } -func (p *mockPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { +func (p *mockPolicy) Allocate(context.Context, state.State, *v1.Pod, *v1.Container) error { return p.err } -func (p *mockPolicy) RemoveContainer(s state.State, podUID string, containerName string) { +func (p *mockPolicy) RemoveContainer(context.Context, state.State, string, string) { } -func (p *mockPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { +func (p *mockPolicy) GetTopologyHints(context.Context, state.State, *v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint { return nil } -func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { +func (p *mockPolicy) GetPodTopologyHints(context.Context, state.State, *v1.Pod) map[string][]topologymanager.TopologyHint { return nil } // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node -func (p *mockPolicy) GetAllocatableMemory(s state.State) []state.Block { +func (p *mockPolicy) GetAllocatableMemory(context.Context, state.State) []state.Block { return []state.Block{} } @@ -485,6 +484,7 @@ func TestGetSystemReservedMemory(t *testing.T) { } func TestRemoveStaleState(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) machineInfo := returnMachineInfo() testCases := []testMemoryManager{ { @@ -896,8 +896,8 @@ func TestRemoveStaleState(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { mgr := &manager{ - policy: returnPolicyByName(testCase), - state: state.NewMemoryState(), + policy: returnPolicyByName(tCtx, testCase), + state: state.NewMemoryState(logger), containerMap: containermap.NewContainerMap(), containerRuntime: mockRuntimeService{ err: nil, @@ -909,13 +909,13 @@ func TestRemoveStaleState(t *testing.T) { mgr.state.SetMemoryAssignments(testCase.assignments) mgr.state.SetMachineState(testCase.machineState) - mgr.removeStaleState() + mgr.removeStaleState(logger) if !areContainerMemoryAssignmentsEqual(t, mgr.state.GetMemoryAssignments(), testCase.expectedAssignments) { t.Errorf("Memory Manager removeStaleState() error, expected assignments %v, but got: %v", testCase.expectedAssignments, mgr.state.GetMemoryAssignments()) } - if !areMachineStatesEqual(mgr.state.GetMachineState(), testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, mgr.state.GetMachineState(), testCase.expectedMachineState) { t.Fatalf("The actual machine state: %v is different from the expected one: %v", mgr.state.GetMachineState(), testCase.expectedMachineState) } }) @@ -924,6 +924,7 @@ func TestRemoveStaleState(t *testing.T) { } func TestAddContainer(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) machineInfo := returnMachineInfo() reserved := systemReservedMemory{ 0: map[v1.ResourceName]uint64{ @@ -1388,8 +1389,8 @@ func TestAddContainer(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { mgr := &manager{ - policy: returnPolicyByName(testCase), - state: state.NewMemoryState(), + policy: returnPolicyByName(tCtx, testCase), + state: state.NewMemoryState(logger), containerMap: containermap.NewContainerMap(), containerRuntime: mockRuntimeService{ err: testCase.updateError, @@ -1410,14 +1411,14 @@ func TestAddContainer(t *testing.T) { t.Errorf("Memory Manager Allocate() error (%v), expected error: %v, but got: %v", testCase.description, testCase.expectedAllocateError, err) } - mgr.AddContainer(pod, container, "fakeID") + mgr.AddContainer(tCtx, pod, container, "fakeID") _, _, err = mgr.containerMap.GetContainerRef("fakeID") if !reflect.DeepEqual(err, testCase.expectedAddContainerError) { t.Errorf("Memory Manager AddContainer() error (%v), expected error: %v, but got: %v", testCase.description, testCase.expectedAddContainerError, err) } - if !areMachineStatesEqual(mgr.state.GetMachineState(), testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, mgr.state.GetMachineState(), testCase.expectedMachineState) { t.Errorf("[test] %+v", mgr.state.GetMemoryAssignments()) t.Fatalf("The actual machine state: %v is different from the expected one: %v", mgr.state.GetMachineState(), testCase.expectedMachineState) } @@ -1427,6 +1428,7 @@ func TestAddContainer(t *testing.T) { } func TestRemoveContainer(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) machineInfo := returnMachineInfo() reserved := systemReservedMemory{ 0: map[v1.ResourceName]uint64{ @@ -1864,8 +1866,8 @@ func TestRemoveContainer(t *testing.T) { iniContainerMap.Add("fakePod1", "fakeContainer1", "fakeID1") iniContainerMap.Add("fakePod1", "fakeContainer2", "fakeID2") mgr := &manager{ - policy: returnPolicyByName(testCase), - state: state.NewMemoryState(), + policy: returnPolicyByName(tCtx, testCase), + state: state.NewMemoryState(logger), containerMap: iniContainerMap, containerRuntime: mockRuntimeService{ err: testCase.expectedError, @@ -1877,7 +1879,7 @@ func TestRemoveContainer(t *testing.T) { mgr.state.SetMemoryAssignments(testCase.assignments) mgr.state.SetMachineState(testCase.machineState) - err := mgr.RemoveContainer(testCase.removeContainerID) + err := mgr.RemoveContainer(tCtx, testCase.removeContainerID) if !reflect.DeepEqual(err, testCase.expectedError) { t.Errorf("Memory Manager RemoveContainer() error (%v), expected error: %v, but got: %v", testCase.description, testCase.expectedError, err) @@ -1888,7 +1890,7 @@ func TestRemoveContainer(t *testing.T) { testCase.expectedAssignments, mgr.state.GetMemoryAssignments(), testCase.expectedAssignments) } - if !areMachineStatesEqual(mgr.state.GetMachineState(), testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, mgr.state.GetMachineState(), testCase.expectedMachineState) { t.Errorf("[test] %+v", mgr.state.GetMemoryAssignments()) t.Errorf("[test] %+v, %+v", mgr.state.GetMachineState()[0].MemoryMap["memory"], mgr.state.GetMachineState()[1].MemoryMap["memory"]) t.Fatalf("The actual machine state: %v is different from the expected one: %v", mgr.state.GetMachineState(), testCase.expectedMachineState) @@ -1905,6 +1907,7 @@ func getPolicyNameForOs() policyType { } func TestNewManager(t *testing.T) { + tCtx := ktesting.Init(t) machineInfo := returnMachineInfo() expectedReserved := systemReservedMemory{ 0: map[v1.ResourceName]uint64{ @@ -1996,7 +1999,7 @@ func TestNewManager(t *testing.T) { } defer os.RemoveAll(stateFileDirectory) - mgr, err := NewManager(string(testCase.policyName), &testCase.machineInfo, testCase.nodeAllocatableReservation, testCase.systemReservedMemory, stateFileDirectory, testCase.affinity) + mgr, err := NewManager(tCtx, string(testCase.policyName), &testCase.machineInfo, testCase.nodeAllocatableReservation, testCase.systemReservedMemory, stateFileDirectory, testCase.affinity) if !reflect.DeepEqual(err, testCase.expectedError) { t.Errorf("Could not create the Memory Manager. Expected error: '%v', but got: '%v'", @@ -2026,6 +2029,7 @@ func TestNewManager(t *testing.T) { } func TestGetTopologyHints(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) testCases := []testMemoryManager{ { description: "Successful hint generation", @@ -2146,8 +2150,8 @@ func TestGetTopologyHints(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { mgr := &manager{ - policy: returnPolicyByName(testCase), - state: state.NewMemoryState(), + policy: returnPolicyByName(tCtx, testCase), + state: state.NewMemoryState(logger), containerMap: containermap.NewContainerMap(), containerRuntime: mockRuntimeService{ err: nil, @@ -2171,6 +2175,7 @@ func TestGetTopologyHints(t *testing.T) { } func TestAllocateAndAddPodWithInitContainers(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) testCases := []testMemoryManager{ { description: "should remove init containers from the state file, once app container started", @@ -2321,10 +2326,10 @@ func TestAllocateAndAddPodWithInitContainers(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - klog.InfoS("TestAllocateAndAddPodWithInitContainers", "name", testCase.description) + logger.Info("TestAllocateAndAddPodWithInitContainers", "name", testCase.description) mgr := &manager{ - policy: returnPolicyByName(testCase), - state: state.NewMemoryState(), + policy: returnPolicyByName(tCtx, testCase), + state: state.NewMemoryState(logger), containerMap: containermap.NewContainerMap(), containerRuntime: mockRuntimeService{ err: nil, @@ -2354,12 +2359,12 @@ func TestAllocateAndAddPodWithInitContainers(t *testing.T) { // Calls AddContainer for init containers for i, initContainer := range testCase.podAllocate.Spec.InitContainers { - mgr.AddContainer(testCase.podAllocate, &testCase.podAllocate.Spec.InitContainers[i], initContainer.Name) + mgr.AddContainer(tCtx, testCase.podAllocate, &testCase.podAllocate.Spec.InitContainers[i], initContainer.Name) } // Calls AddContainer for apps containers for i, appContainer := range testCase.podAllocate.Spec.Containers { - mgr.AddContainer(testCase.podAllocate, &testCase.podAllocate.Spec.Containers[i], appContainer.Name) + mgr.AddContainer(tCtx, testCase.podAllocate, &testCase.podAllocate.Spec.Containers[i], appContainer.Name) } assignments := mgr.state.GetMemoryAssignments() @@ -2368,7 +2373,7 @@ func TestAllocateAndAddPodWithInitContainers(t *testing.T) { } machineState := mgr.state.GetMachineState() - if !areMachineStatesEqual(machineState, testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) { t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState) } }) diff --git a/pkg/kubelet/cm/memorymanager/policy.go b/pkg/kubelet/cm/memorymanager/policy.go index a65a90dca98..69e79cdb7f2 100644 --- a/pkg/kubelet/cm/memorymanager/policy.go +++ b/pkg/kubelet/cm/memorymanager/policy.go @@ -17,6 +17,8 @@ limitations under the License. package memorymanager import ( + "context" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -28,19 +30,19 @@ type policyType string // Policy implements logic for pod container to a memory assignment. type Policy interface { Name() string - Start(s state.State) error + Start(ctx context.Context, s state.State) error // Allocate call is idempotent - Allocate(s state.State, pod *v1.Pod, container *v1.Container) error + Allocate(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) error // RemoveContainer call is idempotent - RemoveContainer(s state.State, podUID string, containerName string) + RemoveContainer(ctx context.Context, s state.State, podUID string, containerName string) // GetTopologyHints implements the topologymanager.HintProvider Interface // and is consulted to achieve NUMA aware resource alignment among this // and other resource controllers. - GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint + GetTopologyHints(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint // GetPodTopologyHints implements the topologymanager.HintProvider Interface // and is consulted to achieve NUMA aware resource alignment among this // and other resource controllers. - GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint + GetPodTopologyHints(ctx context.Context, s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node - GetAllocatableMemory(s state.State) []state.Block + GetAllocatableMemory(ctx context.Context, s state.State) []state.Block } diff --git a/pkg/kubelet/cm/memorymanager/policy_best_effort.go b/pkg/kubelet/cm/memorymanager/policy_best_effort.go index 2a2eabc8263..76ca2c82608 100644 --- a/pkg/kubelet/cm/memorymanager/policy_best_effort.go +++ b/pkg/kubelet/cm/memorymanager/policy_best_effort.go @@ -17,6 +17,8 @@ limitations under the License. package memorymanager import ( + "context" + cadvisorapi "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" @@ -39,8 +41,8 @@ type bestEffortPolicy struct { var _ Policy = &bestEffortPolicy{} -func NewPolicyBestEffort(machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) { - p, err := NewPolicyStatic(machineInfo, reserved, affinity) +func NewPolicyBestEffort(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) { + p, err := NewPolicyStatic(ctx, machineInfo, reserved, affinity) if err != nil { return nil, err @@ -55,26 +57,26 @@ func (p *bestEffortPolicy) Name() string { return string(policyTypeBestEffort) } -func (p *bestEffortPolicy) Start(s state.State) error { - return p.static.Start(s) +func (p *bestEffortPolicy) Start(ctx context.Context, s state.State) error { + return p.static.Start(ctx, s) } -func (p *bestEffortPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { - return p.static.Allocate(s, pod, container) +func (p *bestEffortPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { + return p.static.Allocate(ctx, s, pod, container) } -func (p *bestEffortPolicy) RemoveContainer(s state.State, podUID string, containerName string) { - p.static.RemoveContainer(s, podUID, containerName) +func (p *bestEffortPolicy) RemoveContainer(ctx context.Context, s state.State, podUID string, containerName string) { + p.static.RemoveContainer(ctx, s, podUID, containerName) } -func (p *bestEffortPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { - return p.static.GetPodTopologyHints(s, pod) +func (p *bestEffortPolicy) GetPodTopologyHints(ctx context.Context, s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { + return p.static.GetPodTopologyHints(ctx, s, pod) } -func (p *bestEffortPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { - return p.static.GetTopologyHints(s, pod, container) +func (p *bestEffortPolicy) GetTopologyHints(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { + return p.static.GetTopologyHints(ctx, s, pod, container) } -func (p *bestEffortPolicy) GetAllocatableMemory(s state.State) []state.Block { - return p.static.GetAllocatableMemory(s) +func (p *bestEffortPolicy) GetAllocatableMemory(ctx context.Context, s state.State) []state.Block { + return p.static.GetAllocatableMemory(ctx, s) } diff --git a/pkg/kubelet/cm/memorymanager/policy_none.go b/pkg/kubelet/cm/memorymanager/policy_none.go index 5bb52db7ed4..5b821f7d234 100644 --- a/pkg/kubelet/cm/memorymanager/policy_none.go +++ b/pkg/kubelet/cm/memorymanager/policy_none.go @@ -17,7 +17,10 @@ limitations under the License. package memorymanager import ( + "context" + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) @@ -31,7 +34,7 @@ type none struct{} var _ Policy = &none{} // NewPolicyNone returns new none policy instance -func NewPolicyNone() Policy { +func NewPolicyNone(ctx context.Context) Policy { return &none{} } @@ -39,34 +42,36 @@ func (p *none) Name() string { return string(policyTypeNone) } -func (p *none) Start(s state.State) error { +func (p *none) Start(ctx context.Context, s state.State) error { + logger := klog.FromContext(ctx) + logger.Info("Start") return nil } // Allocate call is idempotent -func (p *none) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { +func (p *none) Allocate(_ context.Context, s state.State, pod *v1.Pod, container *v1.Container) error { return nil } // RemoveContainer call is idempotent -func (p *none) RemoveContainer(s state.State, podUID string, containerName string) { +func (p *none) RemoveContainer(_ context.Context, s state.State, podUID string, containerName string) { } // GetTopologyHints implements the topologymanager.HintProvider Interface // and is consulted to achieve NUMA aware resource alignment among this // and other resource controllers. -func (p *none) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { +func (p *none) GetTopologyHints(_ context.Context, s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { return nil } // GetPodTopologyHints implements the topologymanager.HintProvider Interface // and is consulted to achieve NUMA aware resource alignment among this // and other resource controllers. -func (p *none) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { +func (p *none) GetPodTopologyHints(_ context.Context, s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { return nil } // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node -func (p *none) GetAllocatableMemory(s state.State) []state.Block { +func (p *none) GetAllocatableMemory(_ context.Context, s state.State) []state.Block { return []state.Block{} } diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go index 895a19ad582..ce85610b65d 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static.go +++ b/pkg/kubelet/cm/memorymanager/policy_static.go @@ -17,9 +17,11 @@ limitations under the License. package memorymanager import ( + "context" "fmt" "sort" + "github.com/go-logr/logr" cadvisorapi "github.com/google/cadvisor/info/v1" v1 "k8s.io/api/core/v1" @@ -59,7 +61,7 @@ type staticPolicy struct { var _ Policy = &staticPolicy{} // NewPolicyStatic returns new static policy instance -func NewPolicyStatic(machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) { +func NewPolicyStatic(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) { var totalSystemReserved uint64 for _, node := range reserved { if _, ok := node[v1.ResourceMemory]; !ok { @@ -85,25 +87,28 @@ func (p *staticPolicy) Name() string { return string(PolicyTypeStatic) } -func (p *staticPolicy) Start(s state.State) error { - if err := p.validateState(s); err != nil { - klog.ErrorS(err, "Invalid state, please drain node and remove policy state file") +func (p *staticPolicy) Start(ctx context.Context, s state.State) error { + logger := klog.FromContext(ctx) + if err := p.validateState(logger, s); err != nil { + logger.Error(err, "Invalid state, please drain node and remove policy state file") return err } return nil } // Allocate call is idempotent -func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { +func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) (rerr error) { // allocate the memory only for guaranteed pods + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "containerName", container.Name) qos := v1qos.GetPodQOS(pod) if qos != v1.PodQOSGuaranteed { - klog.V(5).InfoS("Exclusive memory allocation skipped, pod QoS is not guaranteed", "pod", klog.KObj(pod), "containerName", container.Name, "qos", qos) + logger.V(5).Info("Exclusive memory allocation skipped, pod QoS is not guaranteed", "qos", qos) return nil } podUID := string(pod.UID) - klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name) + logger.Info("Allocate") // container belongs in an exclusively allocated pool metrics.MemoryManagerPinningRequestTotal.Inc() defer func() { @@ -114,13 +119,13 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai if blocks := s.GetMemoryBlocks(podUID, container.Name); blocks != nil { p.updatePodReusableMemory(pod, container, blocks) - klog.InfoS("Container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) + logger.Info("Container already present in state, skipping") return nil } // Call Topology Manager to get the aligned affinity across all hint providers. hint := p.affinity.GetAffinity(podUID, container.Name) - klog.InfoS("Got topology affinity", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "hint", hint) + logger.Info("Got topology affinity", "hint", hint) requestedResources, err := getRequestedResources(pod, container) if err != nil { @@ -196,9 +201,9 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai // we only do this so that the sum(memory_for_all_containers) == total amount of allocated memory to the pod, even // though the final state here doesn't accurately reflect what was (in reality) allocated to each container // TODO: we should refactor our state structs to reflect the amount of the re-used memory - p.updateInitContainersMemoryBlocks(s, pod, container, containerBlocks) + p.updateInitContainersMemoryBlocks(logger, s, pod, container, containerBlocks) - klog.V(4).InfoS("Allocated exclusive memory", "pod", klog.KObj(pod), "containerName", container.Name) + logger.V(4).Info("Allocated exclusive memory") return nil } @@ -248,13 +253,15 @@ func (p *staticPolicy) getPodReusableMemory(pod *v1.Pod, numaAffinity bitmask.Bi } // RemoveContainer call is idempotent -func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) { +func (p *staticPolicy) RemoveContainer(ctx context.Context, s state.State, podUID string, containerName string) { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "podUID", podUID, "containerName", containerName) + blocks := s.GetMemoryBlocks(podUID, containerName) if blocks == nil { return } - klog.InfoS("RemoveContainer", "podUID", podUID, "containerName", containerName) + logger.Info("RemoveContainer", "podUID", podUID, "containerName", containerName) s.Delete(podUID, containerName) // Mutate machine memory state to update free and reserved memory @@ -300,35 +307,35 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa s.SetMachineState(machineState) } -func regenerateHints(pod *v1.Pod, ctn *v1.Container, ctnBlocks []state.Block, reqRsrc map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint { +func regenerateHints(logger logr.Logger, pod *v1.Pod, ctn *v1.Container, ctnBlocks []state.Block, reqRsrc map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint { hints := map[string][]topologymanager.TopologyHint{} for resourceName := range reqRsrc { hints[string(resourceName)] = []topologymanager.TopologyHint{} } if len(ctnBlocks) != len(reqRsrc) { - klog.InfoS("The number of requested resources by the container differs from the number of memory blocks", "pod", klog.KObj(pod), "containerName", ctn.Name) + logger.Info("The number of requested resources by the container differs from the number of memory blocks", "containerName", ctn.Name) return nil } for _, b := range ctnBlocks { if _, ok := reqRsrc[b.Type]; !ok { - klog.InfoS("Container requested resources but none available of this type", "pod", klog.KObj(pod), "containerName", ctn.Name, "type", b.Type) + logger.Info("Container requested resources but none available of this type", "containerName", ctn.Name, "type", b.Type) return nil } if b.Size != reqRsrc[b.Type] { - klog.InfoS("Memory already allocated with different numbers than requested", "pod", klog.KObj(pod), "containerName", ctn.Name, "type", b.Type, "requestedResource", reqRsrc[b.Type], "allocatedSize", b.Size) + logger.Info("Memory already allocated with different numbers than requested", "containerName", ctn.Name, "type", b.Type, "requestedResource", reqRsrc[b.Type], "allocatedSize", b.Size) return nil } containerNUMAAffinity, err := bitmask.NewBitMask(b.NUMAAffinity...) if err != nil { - klog.ErrorS(err, "Failed to generate NUMA bitmask", "pod", klog.KObj(pod), "containerName", ctn.Name, "type", b.Type) + logger.Error(err, "Failed to generate NUMA bitmask", "containerName", ctn.Name, "type", b.Type) return nil } - klog.InfoS("Regenerating TopologyHints, resource was already allocated to pod", "resourceName", b.Type, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", ctn.Name) + logger.Info("Regenerating TopologyHints, resource was already allocated to pod", "resourceName", b.Type, "podUID", pod.UID, "containerName", ctn.Name) hints[string(b.Type)] = append(hints[string(b.Type)], topologymanager.TopologyHint{ NUMANodeAffinity: containerNUMAAffinity, Preferred: true, @@ -392,14 +399,16 @@ func getPodRequestedResources(pod *v1.Pod) (map[v1.ResourceName]uint64, error) { return reqRsrcs, nil } -func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { +func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KObj(pod)) + if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return nil } reqRsrcs, err := getPodRequestedResources(pod) if err != nil { - klog.ErrorS(err, "Failed to get pod requested resources", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.Error(err, "Failed to get pod requested resources", "podUID", pod.UID) return nil } @@ -409,7 +418,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin // memory allocated for the container. This might happen after a // kubelet restart, for example. if containerBlocks != nil { - return regenerateHints(pod, &ctn, containerBlocks, reqRsrcs) + return regenerateHints(logger, pod, &ctn, containerBlocks, reqRsrcs) } } @@ -420,14 +429,16 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin // GetTopologyHints implements the topologymanager.HintProvider Interface // and is consulted to achieve NUMA aware resource alignment among this // and other resource controllers. -func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { +func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint { + logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KObj(pod)) + if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return nil } requestedResources, err := getRequestedResources(pod, container) if err != nil { - klog.ErrorS(err, "Failed to get container requested resources", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name) + logger.Error(err, "Failed to get container requested resources", "podUID", pod.UID, "containerName", container.Name) return nil } @@ -436,7 +447,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v // memory allocated for the container. This might happen after a // kubelet restart, for example. if containerBlocks != nil { - return regenerateHints(pod, container, containerBlocks, requestedResources) + return regenerateHints(logger, pod, container, containerBlocks, requestedResources) } return p.calculateHints(s.GetMachineState(), pod, requestedResources) @@ -588,7 +599,7 @@ func areGroupsEqual(group1, group2 []int) bool { return true } -func (p *staticPolicy) validateState(s state.State) error { +func (p *staticPolicy) validateState(logger logr.Logger, s state.State) error { machineState := s.GetMachineState() memoryAssignments := s.GetMemoryAssignments() @@ -654,49 +665,49 @@ func (p *staticPolicy) validateState(s state.State) error { // Validate that total size, system reserved and reserved memory not changed, it can happen, when: // - adding or removing physical memory bank from the node // - change of kubelet system-reserved, kube-reserved or pre-reserved-memory-zone parameters - if !areMachineStatesEqual(machineState, expectedMachineState) { + if !areMachineStatesEqual(logger, machineState, expectedMachineState) { return fmt.Errorf("[memorymanager] the expected machine state is different from the real one") } return nil } -func areMachineStatesEqual(ms1, ms2 state.NUMANodeMap) bool { +func areMachineStatesEqual(logger logr.Logger, ms1, ms2 state.NUMANodeMap) bool { if len(ms1) != len(ms2) { - klog.InfoS("Node states were different", "lengthNode1", len(ms1), "lengthNode2", len(ms2)) + logger.Info("Node states were different", "lengthNode1", len(ms1), "lengthNode2", len(ms2)) return false } for nodeID, nodeState1 := range ms1 { nodeState2, ok := ms2[nodeID] if !ok { - klog.InfoS("Node state didn't have node ID", "nodeID", nodeID) + logger.Info("Node state didn't have node ID", "nodeID", nodeID) return false } if nodeState1.NumberOfAssignments != nodeState2.NumberOfAssignments { - klog.InfoS("Node state had a different number of memory assignments.", "assignment1", nodeState1.NumberOfAssignments, "assignment2", nodeState2.NumberOfAssignments) + logger.Info("Node state had a different number of memory assignments.", "assignment1", nodeState1.NumberOfAssignments, "assignment2", nodeState2.NumberOfAssignments) return false } if !areGroupsEqual(nodeState1.Cells, nodeState2.Cells) { - klog.InfoS("Node states had different groups", "stateNode1", nodeState1.Cells, "stateNode2", nodeState2.Cells) + logger.Info("Node states had different groups", "stateNode1", nodeState1.Cells, "stateNode2", nodeState2.Cells) return false } if len(nodeState1.MemoryMap) != len(nodeState2.MemoryMap) { - klog.InfoS("Node state had memory maps of different lengths", "lengthNode1", len(nodeState1.MemoryMap), "lengthNode2", len(nodeState2.MemoryMap)) + logger.Info("Node state had memory maps of different lengths", "lengthNode1", len(nodeState1.MemoryMap), "lengthNode2", len(nodeState2.MemoryMap)) return false } for resourceName, memoryState1 := range nodeState1.MemoryMap { memoryState2, ok := nodeState2.MemoryMap[resourceName] if !ok { - klog.InfoS("Memory state didn't have resource", "resource", resourceName) + logger.Info("Memory state didn't have resource", "resource", resourceName) return false } - if !areMemoryStatesEqual(memoryState1, memoryState2, nodeID, resourceName) { + if !areMemoryStatesEqual(logger, memoryState1, memoryState2, nodeID, resourceName) { return false } @@ -710,11 +721,11 @@ func areMachineStatesEqual(ms1, ms2 state.NUMANodeMap) bool { } if tmpState1.Free != tmpState2.Free { - klog.InfoS("NUMA node and resource had different memory states", "node", nodeID, "resource", resourceName, "field", "free", "free1", tmpState1.Free, "free2", tmpState2.Free, "memoryState1", *memoryState1, "memoryState2", *memoryState2) + logger.Info("NUMA node and resource had different memory states", "node", nodeID, "resource", resourceName, "field", "free", "free1", tmpState1.Free, "free2", tmpState2.Free, "memoryState1", *memoryState1, "memoryState2", *memoryState2) return false } if tmpState1.Reserved != tmpState2.Reserved { - klog.InfoS("NUMA node and resource had different memory states", "node", nodeID, "resource", resourceName, "field", "reserved", "reserved1", tmpState1.Reserved, "reserved2", tmpState2.Reserved, "memoryState1", *memoryState1, "memoryState2", *memoryState2) + logger.Info("NUMA node and resource had different memory states", "node", nodeID, "resource", resourceName, "field", "reserved", "reserved1", tmpState1.Reserved, "reserved2", tmpState2.Reserved, "memoryState1", *memoryState1, "memoryState2", *memoryState2) return false } } @@ -722,19 +733,20 @@ func areMachineStatesEqual(ms1, ms2 state.NUMANodeMap) bool { return true } -func areMemoryStatesEqual(memoryState1, memoryState2 *state.MemoryTable, nodeID int, resourceName v1.ResourceName) bool { +func areMemoryStatesEqual(logger logr.Logger, memoryState1, memoryState2 *state.MemoryTable, nodeID int, resourceName v1.ResourceName) bool { + loggerWithValues := klog.LoggerWithValues(logger, "node", nodeID, "resource", resourceName, "memoryState1", *memoryState1, "memoryState2", *memoryState2) if memoryState1.TotalMemSize != memoryState2.TotalMemSize { - klog.InfoS("Memory states for the NUMA node and resource are different", "node", nodeID, "resource", resourceName, "field", "TotalMemSize", "TotalMemSize1", memoryState1.TotalMemSize, "TotalMemSize2", memoryState2.TotalMemSize, "memoryState1", *memoryState1, "memoryState2", *memoryState2) + logger.Info("Memory states for the NUMA node and resource are different", "field", "TotalMemSize", "TotalMemSize1", memoryState1.TotalMemSize, "TotalMemSize2", memoryState2.TotalMemSize) return false } if memoryState1.SystemReserved != memoryState2.SystemReserved { - klog.InfoS("Memory states for the NUMA node and resource are different", "node", nodeID, "resource", resourceName, "field", "SystemReserved", "SystemReserved1", memoryState1.SystemReserved, "SystemReserved2", memoryState2.SystemReserved, "memoryState1", *memoryState1, "memoryState2", *memoryState2) + loggerWithValues.Info("Memory states for the NUMA node and resource are different", "field", "SystemReserved", "SystemReserved1", memoryState1.SystemReserved, "SystemReserved2", memoryState2.SystemReserved) return false } if memoryState1.Allocatable != memoryState2.Allocatable { - klog.InfoS("Memory states for the NUMA node and resource are different", "node", nodeID, "resource", resourceName, "field", "Allocatable", "Allocatable1", memoryState1.Allocatable, "Allocatable2", memoryState2.Allocatable, "memoryState1", *memoryState1, "memoryState2", *memoryState2) + loggerWithValues.Info("Memory states for the NUMA node and resource are different", "field", "Allocatable", "Allocatable1", memoryState1.Allocatable, "Allocatable2", memoryState2.Allocatable) return false } return true @@ -898,7 +910,7 @@ func findBestHint(hints []topologymanager.TopologyHint) *topologymanager.Topolog } // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node -func (p *staticPolicy) GetAllocatableMemory(s state.State) []state.Block { +func (p *staticPolicy) GetAllocatableMemory(_ context.Context, s state.State) []state.Block { var allocatableMemory []state.Block machineState := s.GetMachineState() for numaNodeID, numaNodeState := range machineState { @@ -962,7 +974,7 @@ func (p *staticPolicy) updatePodReusableMemory(pod *v1.Pod, container *v1.Contai } } -func (p *staticPolicy) updateInitContainersMemoryBlocks(s state.State, pod *v1.Pod, container *v1.Container, containerMemoryBlocks []state.Block) { +func (p *staticPolicy) updateInitContainersMemoryBlocks(logger logr.Logger, s state.State, pod *v1.Pod, container *v1.Container, containerMemoryBlocks []state.Block) { podUID := string(pod.UID) for _, containerBlock := range containerMemoryBlocks { @@ -998,7 +1010,7 @@ func (p *staticPolicy) updateInitContainersMemoryBlocks(s state.State, pod *v1.P continue } - if !isNUMAAffinitiesEqual(initContainerBlock.NUMAAffinity, containerBlock.NUMAAffinity) { + if !isNUMAAffinitiesEqual(logger, initContainerBlock.NUMAAffinity, containerBlock.NUMAAffinity) { continue } @@ -1026,16 +1038,16 @@ func isRegularInitContainer(pod *v1.Pod, container *v1.Container) bool { return false } -func isNUMAAffinitiesEqual(numaAffinity1, numaAffinity2 []int) bool { +func isNUMAAffinitiesEqual(logger logr.Logger, numaAffinity1, numaAffinity2 []int) bool { bitMask1, err := bitmask.NewBitMask(numaAffinity1...) if err != nil { - klog.ErrorS(err, "failed to create bit mask", "numaAffinity1", numaAffinity1) + logger.Error(err, "failed to create bit mask", "numaAffinity1", numaAffinity1) return false } bitMask2, err := bitmask.NewBitMask(numaAffinity2...) if err != nil { - klog.ErrorS(err, "failed to create bit mask", "numaAffinity2", numaAffinity2) + logger.Error(err, "failed to create bit mask", "numaAffinity2", numaAffinity2) return false } diff --git a/pkg/kubelet/cm/memorymanager/policy_static_test.go b/pkg/kubelet/cm/memorymanager/policy_static_test.go index aba82b6525f..cf85d75ddb8 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static_test.go +++ b/pkg/kubelet/cm/memorymanager/policy_static_test.go @@ -21,8 +21,6 @@ import ( "reflect" "testing" - "k8s.io/klog/v2" - cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/google/go-cmp/cmp" @@ -31,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -135,19 +134,20 @@ type testStaticPolicy struct { } func initTests(t *testing.T, testCase *testStaticPolicy, hint *topologymanager.TopologyHint, initContainersReusableMemory reusableMemory) (Policy, state.State, error) { + logger, tCtx := ktesting.NewTestContext(t) manager := topologymanager.NewFakeManager() if hint != nil { manager = topologymanager.NewFakeManagerWithHint(hint) } - p, err := NewPolicyStatic(testCase.machineInfo, testCase.systemReserved, manager) + p, err := NewPolicyStatic(tCtx, testCase.machineInfo, testCase.systemReserved, manager) if err != nil { return nil, nil, err } if initContainersReusableMemory != nil { p.(*staticPolicy).initContainersReusableMemory = initContainersReusableMemory } - s := state.NewMemoryState() + s := state.NewMemoryState(logger) s.SetMachineState(testCase.machineState) s.SetMemoryAssignments(testCase.assignments) return p, s, nil @@ -213,6 +213,7 @@ func TestStaticPolicyName(t *testing.T) { } func TestStaticPolicyStart(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) testCases := []testStaticPolicy{ { description: "should fail, if machine state is empty, but it has memory assignments", @@ -1153,7 +1154,7 @@ func TestStaticPolicyStart(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - err = p.Start(s) + err = p.Start(tCtx, s) if !reflect.DeepEqual(err, testCase.expectedError) { t.Fatalf("The actual error: %v is different from the expected one: %v", err, testCase.expectedError) } @@ -1168,7 +1169,7 @@ func TestStaticPolicyStart(t *testing.T) { } machineState := s.GetMachineState() - if !areMachineStatesEqual(machineState, testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) { t.Fatalf("The actual machine state: %v is different from the expected one: %v", machineState, testCase.expectedMachineState) } }) @@ -1176,6 +1177,7 @@ func TestStaticPolicyStart(t *testing.T) { } func TestStaticPolicyAllocate(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) testCases := []testStaticPolicy{ { description: "should do nothing for non-guaranteed pods", @@ -2014,7 +2016,7 @@ func TestStaticPolicyAllocate(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.Containers[0]) + err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0]) if !reflect.DeepEqual(err, testCase.expectedError) { t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) } @@ -2029,7 +2031,7 @@ func TestStaticPolicyAllocate(t *testing.T) { } machineState := s.GetMachineState() - if !areMachineStatesEqual(machineState, testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) { t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState) } }) @@ -2037,6 +2039,7 @@ func TestStaticPolicyAllocate(t *testing.T) { } func TestStaticPolicyAllocateWithInitContainers(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) testCases := []testStaticPolicy{ { description: "should re-use init containers memory, init containers requests 1Gi and 2Gi, apps containers 3Gi and 4Gi", @@ -2731,21 +2734,21 @@ func TestStaticPolicyAllocateWithInitContainers(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - klog.InfoS("TestStaticPolicyAllocateWithInitContainers", "name", testCase.description) + logger.Info("TestStaticPolicyAllocateWithInitContainers", "name", testCase.description) p, s, err := initTests(t, &testCase, testCase.topologyHint, testCase.initContainersReusableMemory) if err != nil { t.Fatalf("Unexpected error: %v", err) } for i := range testCase.pod.Spec.InitContainers { - err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.InitContainers[i]) + err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.InitContainers[i]) if !reflect.DeepEqual(err, testCase.expectedError) { t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) } } for i := range testCase.pod.Spec.Containers { - err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.Containers[i]) + err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[i]) if !reflect.DeepEqual(err, testCase.expectedError) { t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) } @@ -2757,7 +2760,7 @@ func TestStaticPolicyAllocateWithInitContainers(t *testing.T) { } machineState := s.GetMachineState() - if !areMachineStatesEqual(machineState, testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) { t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState) } }) @@ -2765,6 +2768,7 @@ func TestStaticPolicyAllocateWithInitContainers(t *testing.T) { } func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) testCases := []testStaticPolicy{ { description: "should do nothing once containers already exist under the state file", @@ -3064,14 +3068,14 @@ func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - klog.InfoS("TestStaticPolicyAllocateWithRestartableInitContainers", "name", testCase.description) + logger.Info("TestStaticPolicyAllocateWithRestartableInitContainers", "name", testCase.description) p, s, err := initTests(t, &testCase, testCase.topologyHint, testCase.initContainersReusableMemory) if err != nil { t.Fatalf("Unexpected error: %v", err) } for i := range testCase.pod.Spec.InitContainers { - err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.InitContainers[i]) + err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.InitContainers[i]) if !reflect.DeepEqual(err, testCase.expectedError) { t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError) } @@ -3082,7 +3086,7 @@ func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) { } for i := range testCase.pod.Spec.Containers { - err = p.Allocate(s, testCase.pod, &testCase.pod.Spec.Containers[i]) + err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[i]) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3094,7 +3098,7 @@ func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) { } machineState := s.GetMachineState() - if !areMachineStatesEqual(machineState, testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) { t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState) } }) @@ -3102,6 +3106,7 @@ func TestStaticPolicyAllocateWithRestartableInitContainers(t *testing.T) { } func TestStaticPolicyRemoveContainer(t *testing.T) { + logger, tCtx := ktesting.NewTestContext(t) testCases := []testStaticPolicy{ { description: "should do nothing when the container does not exist under the state", @@ -3344,14 +3349,14 @@ func TestStaticPolicyRemoveContainer(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - p.RemoveContainer(s, "pod1", "container1") + p.RemoveContainer(tCtx, s, "pod1", "container1") assignments := s.GetMemoryAssignments() if !areContainerMemoryAssignmentsEqual(t, assignments, testCase.expectedAssignments) { t.Fatalf("Actual assignments %v are different from the expected %v", assignments, testCase.expectedAssignments) } machineState := s.GetMachineState() - if !areMachineStatesEqual(machineState, testCase.expectedMachineState) { + if !areMachineStatesEqual(logger, machineState, testCase.expectedMachineState) { t.Fatalf("The actual machine state %v is different from the expected %v", machineState, testCase.expectedMachineState) } }) @@ -3359,6 +3364,7 @@ func TestStaticPolicyRemoveContainer(t *testing.T) { } func TestStaticPolicyGetTopologyHints(t *testing.T) { + tCtx := ktesting.Init(t) testCases := []testStaticPolicy{ { description: "should not provide topology hints for non-guaranteed pods", @@ -3735,7 +3741,7 @@ func TestStaticPolicyGetTopologyHints(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - topologyHints := p.GetTopologyHints(s, testCase.pod, &testCase.pod.Spec.Containers[0]) + topologyHints := p.GetTopologyHints(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0]) if !reflect.DeepEqual(topologyHints, testCase.expectedTopologyHints) { t.Fatalf("The actual topology hints: '%+v' are different from the expected one: '%+v'", topologyHints, testCase.expectedTopologyHints) } diff --git a/pkg/kubelet/cm/memorymanager/state/state_checkpoint.go b/pkg/kubelet/cm/memorymanager/state/state_checkpoint.go index dce0bda55a3..a23833f26c0 100644 --- a/pkg/kubelet/cm/memorymanager/state/state_checkpoint.go +++ b/pkg/kubelet/cm/memorymanager/state/state_checkpoint.go @@ -30,6 +30,7 @@ var _ State = &stateCheckpoint{} type stateCheckpoint struct { sync.RWMutex + logger klog.Logger cache State policyName string checkpointManager checkpointmanager.CheckpointManager @@ -37,13 +38,15 @@ type stateCheckpoint struct { } // NewCheckpointState creates new State for keeping track of memory/pod assignment with checkpoint backend -func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) { +func NewCheckpointState(logger klog.Logger, stateDir, checkpointName, policyName string) (State, error) { + logger = klog.LoggerWithName(logger, "Memory Manager state checkpoint") checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) } stateCheckpoint := &stateCheckpoint{ - cache: NewMemoryState(), + logger: logger, + cache: NewMemoryState(logger), policyName: policyName, checkpointManager: checkpointManager, checkpointName: checkpointName, @@ -79,7 +82,7 @@ func (sc *stateCheckpoint) restoreState() error { sc.cache.SetMachineState(checkpoint.MachineState) sc.cache.SetMemoryAssignments(checkpoint.Entries) - klog.V(2).InfoS("State checkpoint: restored state from checkpoint") + sc.logger.V(2).Info("State checkpoint: restored state from checkpoint") return nil } @@ -93,7 +96,7 @@ func (sc *stateCheckpoint) storeState() error { err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) if err != nil { - klog.ErrorS(err, "Could not save checkpoint") + sc.logger.Error(err, "Could not save checkpoint") return err } return nil @@ -131,7 +134,7 @@ func (sc *stateCheckpoint) SetMachineState(memoryMap NUMANodeMap) { sc.cache.SetMachineState(memoryMap) err := sc.storeState() if err != nil { - klog.ErrorS(err, "Failed to store state to checkpoint") + sc.logger.Error(err, "Failed to store state to checkpoint") } } @@ -143,7 +146,7 @@ func (sc *stateCheckpoint) SetMemoryBlocks(podUID string, containerName string, sc.cache.SetMemoryBlocks(podUID, containerName, blocks) err := sc.storeState() if err != nil { - klog.ErrorS(err, "Failed to store state to checkpoint", "podUID", podUID, "containerName", containerName) + sc.logger.Error(err, "Failed to store state to checkpoint", "podUID", podUID, "containerName", containerName) } } @@ -155,7 +158,7 @@ func (sc *stateCheckpoint) SetMemoryAssignments(assignments ContainerMemoryAssig sc.cache.SetMemoryAssignments(assignments) err := sc.storeState() if err != nil { - klog.ErrorS(err, "Failed to store state to checkpoint") + sc.logger.Error(err, "Failed to store state to checkpoint") } } @@ -167,7 +170,7 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) { sc.cache.Delete(podUID, containerName) err := sc.storeState() if err != nil { - klog.ErrorS(err, "Failed to store state to checkpoint", "podUID", podUID, "containerName", containerName) + sc.logger.Error(err, "Failed to store state to checkpoint", "podUID", podUID, "containerName", containerName) } } @@ -179,6 +182,6 @@ func (sc *stateCheckpoint) ClearState() { sc.cache.ClearState() err := sc.storeState() if err != nil { - klog.ErrorS(err, "Failed to store state to checkpoint") + sc.logger.Error(err, "Failed to store state to checkpoint") } } diff --git a/pkg/kubelet/cm/memorymanager/state/state_checkpoint_test.go b/pkg/kubelet/cm/memorymanager/state/state_checkpoint_test.go index 5b24ca2cca2..ae9be6f3ec0 100644 --- a/pkg/kubelet/cm/memorymanager/state/state_checkpoint_test.go +++ b/pkg/kubelet/cm/memorymanager/state/state_checkpoint_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" + "k8s.io/kubernetes/test/utils/ktesting" ) const testingCheckpoint = "memorymanager_checkpoint_test" @@ -42,6 +43,7 @@ func assertStateEqual(t *testing.T, restoredState, expectedState State) { } func TestCheckpointStateRestore(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { description string checkpointContent string @@ -131,7 +133,7 @@ func TestCheckpointStateRestore(t *testing.T) { assert.NoError(t, cpm.CreateCheckpoint(testingCheckpoint, checkpoint), "could not create testing checkpoint") } - restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, "static") + restoredState, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static") if strings.TrimSpace(tc.expectedError) != "" { assert.Error(t, err) assert.ErrorContains(t, err, "could not restore state from checkpoint: "+tc.expectedError) @@ -145,6 +147,7 @@ func TestCheckpointStateRestore(t *testing.T) { } func TestCheckpointStateStore(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) expectedState := &stateMemory{ assignments: ContainerMemoryAssignments{ "pod": map[string][]Block{ @@ -184,7 +187,7 @@ func TestCheckpointStateStore(t *testing.T) { assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint") - cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "static") + cs1, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static") assert.NoError(t, err, "could not create testing checkpointState instance") // set values of cs1 instance so they are stored in checkpoint and can be read by cs2 @@ -192,13 +195,14 @@ func TestCheckpointStateStore(t *testing.T) { cs1.SetMemoryAssignments(expectedState.assignments) // restore checkpoint with previously stored values - cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "static") + cs2, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static") assert.NoError(t, err, "could not create testing checkpointState instance") assertStateEqual(t, cs2, expectedState) } func TestCheckpointStateHelpers(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { description string machineState NUMANodeMap @@ -306,7 +310,7 @@ func TestCheckpointStateHelpers(t *testing.T) { // ensure there is no previous checkpoint assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint") - state, err := NewCheckpointState(testingDir, testingCheckpoint, "static") + state, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static") assert.NoError(t, err, "could not create testing checkpoint manager") state.SetMachineState(tc.machineState) @@ -326,6 +330,7 @@ func TestCheckpointStateHelpers(t *testing.T) { } func TestCheckpointStateClear(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { description string machineState NUMANodeMap @@ -369,7 +374,7 @@ func TestCheckpointStateClear(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - state, err := NewCheckpointState(testingDir, testingCheckpoint, "static") + state, err := NewCheckpointState(logger, testingDir, testingCheckpoint, "static") assert.NoError(t, err, "could not create testing checkpoint manager") state.SetMachineState(tc.machineState) diff --git a/pkg/kubelet/cm/memorymanager/state/state_mem.go b/pkg/kubelet/cm/memorymanager/state/state_mem.go index 19861c6e35a..59aec925608 100644 --- a/pkg/kubelet/cm/memorymanager/state/state_mem.go +++ b/pkg/kubelet/cm/memorymanager/state/state_mem.go @@ -24,6 +24,7 @@ import ( type stateMemory struct { sync.RWMutex + logger klog.Logger assignments ContainerMemoryAssignments machineState NUMANodeMap } @@ -31,9 +32,10 @@ type stateMemory struct { var _ State = &stateMemory{} // NewMemoryState creates new State for keeping track of cpu/pod assignment -func NewMemoryState() State { - klog.InfoS("Initializing new in-memory state store") +func NewMemoryState(logger klog.Logger) State { + logger.Info("Initializing new in-memory state store") return &stateMemory{ + logger: logger, assignments: ContainerMemoryAssignments{}, machineState: NUMANodeMap{}, } @@ -72,7 +74,7 @@ func (s *stateMemory) SetMachineState(nodeMap NUMANodeMap) { defer s.Unlock() s.machineState = nodeMap.Clone() - klog.InfoS("Updated machine memory state") + s.logger.Info("Updated machine memory state") } // SetMemoryBlocks stores memory assignments of container @@ -85,7 +87,7 @@ func (s *stateMemory) SetMemoryBlocks(podUID string, containerName string, block } s.assignments[podUID][containerName] = append([]Block{}, blocks...) - klog.InfoS("Updated memory state", "podUID", podUID, "containerName", containerName) + s.logger.Info("Updated memory state", "podUID", podUID, "containerName", containerName) } // SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter @@ -94,7 +96,7 @@ func (s *stateMemory) SetMemoryAssignments(assignments ContainerMemoryAssignment defer s.Unlock() s.assignments = assignments.Clone() - klog.V(5).InfoS("Updated Memory assignments", "assignments", assignments) + s.logger.V(5).Info("Updated Memory assignments", "assignments", assignments) } // Delete deletes corresponding Blocks from ContainerMemoryAssignments @@ -110,7 +112,7 @@ func (s *stateMemory) Delete(podUID string, containerName string) { if len(s.assignments[podUID]) == 0 { delete(s.assignments, podUID) } - klog.V(2).InfoS("Deleted memory assignment", "podUID", podUID, "containerName", containerName) + s.logger.V(2).Info("Deleted memory assignment", "podUID", podUID, "containerName", containerName) } // ClearState clears machineState and ContainerMemoryAssignments @@ -120,5 +122,5 @@ func (s *stateMemory) ClearState() { s.machineState = NUMANodeMap{} s.assignments = make(ContainerMemoryAssignments) - klog.V(2).InfoS("Cleared state") + s.logger.V(2).Info("Cleared state") }