From b965502c495ecdfdc0b24cbcdeff9bba24174cc1 Mon Sep 17 00:00:00 2001 From: Artyom Lukianov Date: Wed, 17 Feb 2021 17:19:16 +0200 Subject: [PATCH] memory manager: re-use the memory allocated for init containers The idea that during allocation phase we will: - during call to `Allocate` and `GetTopologyHints` we will take into account the init containers reusable memory, which means that we will re-use the memory and update container memory blocks accordingly. For example for the pod with two init containers that requested: 1Gi and 2Gi, and app container that requested 4Gi, we can re-use 2Gi of memory. Signed-off-by: Artyom Lukianov --- pkg/kubelet/cm/memorymanager/policy_static.go | 258 ++++++++++++++---- 1 file changed, 210 insertions(+), 48 deletions(-) diff --git a/pkg/kubelet/cm/memorymanager/policy_static.go b/pkg/kubelet/cm/memorymanager/policy_static.go index 1767e77bc35..333860188c8 100644 --- a/pkg/kubelet/cm/memorymanager/policy_static.go +++ b/pkg/kubelet/cm/memorymanager/policy_static.go @@ -36,6 +36,7 @@ import ( const policyTypeStatic policyType = "Static" type systemReservedMemory map[int]map[v1.ResourceName]uint64 +type reusableMemory map[string]map[string]map[v1.ResourceName]uint64 // staticPolicy is implementation of the policy interface for the static policy type staticPolicy struct { @@ -45,6 +46,8 @@ type staticPolicy struct { systemReserved systemReservedMemory // topology manager reference to get container Topology affinity affinity topologymanager.Store + // initContainersReusableMemory contains the memory allocated for init containers that can be reused + initContainersReusableMemory reusableMemory } var _ Policy = &staticPolicy{} @@ -65,9 +68,10 @@ func NewPolicyStatic(machineInfo *cadvisorapi.MachineInfo, reserved systemReserv } return &staticPolicy{ - machineInfo: machineInfo, - systemReserved: reserved, - affinity: affinity, + machineInfo: machineInfo, + systemReserved: reserved, + affinity: affinity, + initContainersReusableMemory: reusableMemory{}, }, nil } @@ -90,14 +94,17 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai return nil } + podUID := string(pod.UID) klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name) - if blocks := s.GetMemoryBlocks(string(pod.UID), container.Name); blocks != nil { + if blocks := s.GetMemoryBlocks(podUID, container.Name); blocks != nil { + p.updatePodReusableMemory(pod, container, blocks) + klog.InfoS("Container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name) return nil } // Call Topology Manager to get the aligned affinity across all hint providers. - hint := p.affinity.GetAffinity(string(pod.UID), container.Name) + hint := p.affinity.GetAffinity(podUID, container.Name) klog.InfoS("Got topology affinity", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "hint", hint) requestedResources, err := getRequestedResources(container) @@ -105,11 +112,12 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai return err } + machineState := s.GetMachineState() bestHint := &hint // topology manager returned the hint with NUMA affinity nil // we should use the default NUMA affinity calculated the same way as for the topology manager if hint.NUMANodeAffinity == nil { - defaultHint, err := p.getDefaultHint(s, requestedResources) + defaultHint, err := p.getDefaultHint(machineState, pod, requestedResources) if err != nil { return err } @@ -120,12 +128,10 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai bestHint = defaultHint } - machineState := s.GetMachineState() - // topology manager returns the hint that does not satisfy completely the container request // we should extend this hint to the one who will satisfy the request and include the current hint if !isAffinitySatisfyRequest(machineState, bestHint.NUMANodeAffinity, requestedResources) { - extendedHint, err := p.extendTopologyManagerHint(s, requestedResources, bestHint.NUMANodeAffinity) + extendedHint, err := p.extendTopologyManagerHint(machineState, pod, requestedResources, bestHint.NUMANodeAffinity) if err != nil { return err } @@ -146,43 +152,78 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai Type: resourceName, }) - // Update nodes memory state - for _, nodeID := range maskBits { - machineState[nodeID].NumberOfAssignments++ - machineState[nodeID].Cells = maskBits - - // we need to continue to update all affinity mask nodes - if requestedSize == 0 { - continue - } - - // update the node memory state - nodeResourceMemoryState := machineState[nodeID].MemoryMap[resourceName] - if nodeResourceMemoryState.Free <= 0 { - continue - } - - // the node has enough memory to satisfy the request - if nodeResourceMemoryState.Free >= requestedSize { - nodeResourceMemoryState.Reserved += requestedSize - nodeResourceMemoryState.Free -= requestedSize - requestedSize = 0 - continue - } - - // the node does not have enough memory, use the node remaining memory and move to the next node - requestedSize -= nodeResourceMemoryState.Free - nodeResourceMemoryState.Reserved += nodeResourceMemoryState.Free - nodeResourceMemoryState.Free = 0 + podReusableMemory := p.getPodReusableMemory(pod, bestHint.NUMANodeAffinity, resourceName) + if podReusableMemory >= requestedSize { + requestedSize = 0 + } else { + requestedSize -= podReusableMemory } + + // Update nodes memory state + p.updateMachineState(machineState, maskBits, resourceName, requestedSize) } + p.updatePodReusableMemory(pod, container, containerBlocks) + s.SetMachineState(machineState) - s.SetMemoryBlocks(string(pod.UID), container.Name, containerBlocks) + s.SetMemoryBlocks(podUID, container.Name, containerBlocks) + + // update init containers memory blocks to reflect the fact that we re-used init containers memory + // it possible that the size of the init container memory block will have 0 value, when all memory + // allocated for it was re-used + // we only do this so that the sum(memory_for_all_containers) == total amount of allocated memory to the pod, even + // though the fine state here doesn't accurately reflect what was (in reality) allocated to each container + // TODO: we should refactor our state structs to reflect the amount of the re-used memory + p.updateInitContainersMemoryBlocks(s, pod, container, containerBlocks) return nil } +func (p *staticPolicy) updateMachineState(machineState state.NUMANodeMap, numaAffinity []int, resourceName v1.ResourceName, requestedSize uint64) { + for _, nodeID := range numaAffinity { + machineState[nodeID].NumberOfAssignments++ + machineState[nodeID].Cells = numaAffinity + + // we need to continue to update all affinity mask nodes + if requestedSize == 0 { + continue + } + + // update the node memory state + nodeResourceMemoryState := machineState[nodeID].MemoryMap[resourceName] + if nodeResourceMemoryState.Free <= 0 { + continue + } + + // the node has enough memory to satisfy the request + if nodeResourceMemoryState.Free >= requestedSize { + nodeResourceMemoryState.Reserved += requestedSize + nodeResourceMemoryState.Free -= requestedSize + requestedSize = 0 + continue + } + + // the node does not have enough memory, use the node remaining memory and move to the next node + requestedSize -= nodeResourceMemoryState.Free + nodeResourceMemoryState.Reserved += nodeResourceMemoryState.Free + nodeResourceMemoryState.Free = 0 + } +} + +func (p *staticPolicy) getPodReusableMemory(pod *v1.Pod, numaAffinity bitmask.BitMask, resourceName v1.ResourceName) uint64 { + podReusableMemory, ok := p.initContainersReusableMemory[string(pod.UID)] + if !ok { + return 0 + } + + numaReusableMemory, ok := podReusableMemory[numaAffinity.String()] + if !ok { + return 0 + } + + return numaReusableMemory[resourceName] +} + // RemoveContainer call is idempotent func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error { klog.InfoS("RemoveContainer", "podUID", podUID, "containerName", containerName) @@ -339,7 +380,9 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin return regenerateHints(pod, &ctn, containerBlocks, reqRsrcs) } } - return p.calculateHints(s, reqRsrcs) + + // the pod topology hints calculated only once for all containers, so no need to pass re-usable state + return p.calculateHints(s.GetMachineState(), pod, reqRsrcs) } // GetTopologyHints implements the topologymanager.HintProvider Interface @@ -364,7 +407,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v return regenerateHints(pod, container, containerBlocks, requestedResources) } - return p.calculateHints(s, requestedResources) + return p.calculateHints(s.GetMachineState(), pod, requestedResources) } func getRequestedResources(container *v1.Container) (map[v1.ResourceName]uint64, error) { @@ -382,8 +425,7 @@ func getRequestedResources(container *v1.Container) (map[v1.ResourceName]uint64, return requestedResources, nil } -func (p *staticPolicy) calculateHints(s state.State, requestedResources map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint { - machineState := s.GetMachineState() +func (p *staticPolicy) calculateHints(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint { var numaNodes []int for n := range machineState { numaNodes = append(numaNodes, n) @@ -447,7 +489,8 @@ func (p *staticPolicy) calculateHints(s state.State, requestedResources map[v1.R // verify that for all memory types the node mask has enough free resources for resourceName, requestedSize := range requestedResources { - if totalFreeSize[resourceName] < requestedSize { + podReusableMemory := p.getPodReusableMemory(pod, mask, resourceName) + if totalFreeSize[resourceName]+podReusableMemory < requestedSize { return } } @@ -671,8 +714,8 @@ func (p *staticPolicy) getResourceSystemReserved(nodeID int, resourceName v1.Res return systemReserved } -func (p *staticPolicy) getDefaultHint(s state.State, requestedResources map[v1.ResourceName]uint64) (*topologymanager.TopologyHint, error) { - hints := p.calculateHints(s, requestedResources) +func (p *staticPolicy) getDefaultHint(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64) (*topologymanager.TopologyHint, error) { + hints := p.calculateHints(machineState, pod, requestedResources) if len(hints) < 1 { return nil, fmt.Errorf("[memorymanager] failed to get the default NUMA affinity, no NUMA nodes with enough memory is available") } @@ -706,8 +749,8 @@ func isAffinitySatisfyRequest(machineState state.NUMANodeMap, mask bitmask.BitMa // the topology manager uses bitwise AND to merge all topology hints into the best one, so in case of the restricted policy, // it possible that we will get the subset of hint that we provided to the topology manager, in this case we want to extend // it to the original one -func (p *staticPolicy) extendTopologyManagerHint(s state.State, requestedResources map[v1.ResourceName]uint64, mask bitmask.BitMask) (*topologymanager.TopologyHint, error) { - hints := p.calculateHints(s, requestedResources) +func (p *staticPolicy) extendTopologyManagerHint(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64, mask bitmask.BitMask) (*topologymanager.TopologyHint, error) { + hints := p.calculateHints(machineState, pod, requestedResources) var filteredHints []topologymanager.TopologyHint // hints for all memory types should be the same, so we will check hints only for regular memory type @@ -742,7 +785,8 @@ func isHintInGroup(hint []int, group []int) bool { } hintIndex++ } - return false + + return hintIndex == len(hint) } func findBestHint(hints []topologymanager.TopologyHint) *topologymanager.TopologyHint { @@ -788,3 +832,121 @@ func (p *staticPolicy) GetAllocatableMemory(s state.State) []state.Block { } return allocatableMemory } + +func (p *staticPolicy) updatePodReusableMemory(pod *v1.Pod, container *v1.Container, memoryBlocks []state.Block) { + podUID := string(pod.UID) + + // If pod entries to m.initContainersReusableMemory other than the current pod exist, delete them. + for uid := range p.initContainersReusableMemory { + if podUID != uid { + delete(p.initContainersReusableMemory, podUID) + } + } + + if isInitContainer(pod, container) { + if _, ok := p.initContainersReusableMemory[podUID]; !ok { + p.initContainersReusableMemory[podUID] = map[string]map[v1.ResourceName]uint64{} + } + + for _, block := range memoryBlocks { + blockBitMask, _ := bitmask.NewBitMask(block.NUMAAffinity...) + blockBitMaskString := blockBitMask.String() + + if _, ok := p.initContainersReusableMemory[podUID][blockBitMaskString]; !ok { + p.initContainersReusableMemory[podUID][blockBitMaskString] = map[v1.ResourceName]uint64{} + } + + if blockReusableMemory := p.initContainersReusableMemory[podUID][blockBitMaskString][block.Type]; block.Size > blockReusableMemory { + p.initContainersReusableMemory[podUID][blockBitMaskString][block.Type] = block.Size + } + } + + return + } + + // update re-usable memory once it used by the app container + for _, block := range memoryBlocks { + blockBitMask, _ := bitmask.NewBitMask(block.NUMAAffinity...) + if podReusableMemory := p.getPodReusableMemory(pod, blockBitMask, block.Type); podReusableMemory != 0 { + if block.Size >= podReusableMemory { + p.initContainersReusableMemory[podUID][blockBitMask.String()][block.Type] = 0 + } else { + p.initContainersReusableMemory[podUID][blockBitMask.String()][block.Type] -= block.Size + } + } + } +} + +func (p *staticPolicy) updateInitContainersMemoryBlocks(s state.State, pod *v1.Pod, container *v1.Container, containerMemoryBlocks []state.Block) { + podUID := string(pod.UID) + + for _, containerBlock := range containerMemoryBlocks { + blockSize := containerBlock.Size + for _, initContainer := range pod.Spec.InitContainers { + // we do not want to continue updates once we reach the current container + if initContainer.Name == container.Name { + break + } + + if blockSize == 0 { + break + } + + initContainerBlocks := s.GetMemoryBlocks(podUID, initContainer.Name) + if len(initContainerBlocks) == 0 { + continue + } + + for i := range initContainerBlocks { + initContainerBlock := &initContainerBlocks[i] + if initContainerBlock.Size == 0 { + continue + } + + if initContainerBlock.Type != containerBlock.Type { + continue + } + + if !isNUMAAffinitiesEqual(initContainerBlock.NUMAAffinity, containerBlock.NUMAAffinity) { + continue + } + + if initContainerBlock.Size > blockSize { + initContainerBlock.Size -= blockSize + blockSize = 0 + } else { + blockSize -= initContainerBlock.Size + initContainerBlock.Size = 0 + } + } + + s.SetMemoryBlocks(podUID, initContainer.Name, initContainerBlocks) + } + } +} + +func isInitContainer(pod *v1.Pod, container *v1.Container) bool { + for _, initContainer := range pod.Spec.InitContainers { + if initContainer.Name == container.Name { + return true + } + } + + return false +} + +func isNUMAAffinitiesEqual(numaAffinity1, numaAffinity2 []int) bool { + bitMask1, err := bitmask.NewBitMask(numaAffinity1...) + if err != nil { + klog.ErrorS(err, "failed to create bit mask", numaAffinity1) + return false + } + + bitMask2, err := bitmask.NewBitMask(numaAffinity2...) + if err != nil { + klog.ErrorS(err, "failed to create bit mask", numaAffinity2) + return false + } + + return bitMask1.IsEqual(bitMask2) +}