mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Checks whether we have cached runtime state before starting a container
that requests any device plugin resource. If not, re-issue Allocate grpc calls. This allows us to handle the edge case that a pod got assigned to a node even before it populates its extended resource capacity.
This commit is contained in:
		@@ -311,10 +311,7 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
 | 
				
			|||||||
	return false
 | 
						return false
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Allocate is the call that you can use to allocate a set of devices
 | 
					func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
 | 
				
			||||||
// from the registered device plugins.
 | 
					 | 
				
			||||||
func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
 | 
					 | 
				
			||||||
	pod := attrs.Pod
 | 
					 | 
				
			||||||
	devicesToReuse := make(map[string]sets.String)
 | 
						devicesToReuse := make(map[string]sets.String)
 | 
				
			||||||
	for _, container := range pod.Spec.InitContainers {
 | 
						for _, container := range pod.Spec.InitContainers {
 | 
				
			||||||
		if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
 | 
							if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
 | 
				
			||||||
@@ -328,6 +325,18 @@ func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycl
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
 | 
							m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Allocate is the call that you can use to allocate a set of devices
 | 
				
			||||||
 | 
					// from the registered device plugins.
 | 
				
			||||||
 | 
					func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
 | 
				
			||||||
 | 
						pod := attrs.Pod
 | 
				
			||||||
 | 
						err := m.allocatePodResources(pod)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	m.mutex.Lock()
 | 
						m.mutex.Lock()
 | 
				
			||||||
	defer m.mutex.Unlock()
 | 
						defer m.mutex.Unlock()
 | 
				
			||||||
@@ -717,6 +726,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
 | 
				
			|||||||
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
 | 
					func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
 | 
				
			||||||
	podUID := string(pod.UID)
 | 
						podUID := string(pod.UID)
 | 
				
			||||||
	contName := container.Name
 | 
						contName := container.Name
 | 
				
			||||||
 | 
						needsReAllocate := false
 | 
				
			||||||
	for k := range container.Resources.Limits {
 | 
						for k := range container.Resources.Limits {
 | 
				
			||||||
		resource := string(k)
 | 
							resource := string(k)
 | 
				
			||||||
		if !m.isDevicePluginResource(resource) {
 | 
							if !m.isDevicePluginResource(resource) {
 | 
				
			||||||
@@ -726,6 +736,16 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							// This is a device plugin resource yet we don't have cached
 | 
				
			||||||
 | 
							// resource state. This is likely due to a race during node
 | 
				
			||||||
 | 
							// restart. We re-issue allocate request to cover this race.
 | 
				
			||||||
 | 
							if m.podDevices.containerDevices(podUID, contName, resource) == nil {
 | 
				
			||||||
 | 
								needsReAllocate = true
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if needsReAllocate {
 | 
				
			||||||
 | 
							klog.V(2).Infof("needs re-allocate device plugin resources for pod %s", podUID)
 | 
				
			||||||
 | 
							m.allocatePodResources(pod)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	m.mutex.Lock()
 | 
						m.mutex.Lock()
 | 
				
			||||||
	defer m.mutex.Unlock()
 | 
						defer m.mutex.Unlock()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -134,6 +134,7 @@ func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
 | 
				
			|||||||
	requiresUpdate := false
 | 
						requiresUpdate := false
 | 
				
			||||||
	for k := range node.Status.Capacity {
 | 
						for k := range node.Status.Capacity {
 | 
				
			||||||
		if v1helper.IsExtendedResourceName(k) {
 | 
							if v1helper.IsExtendedResourceName(k) {
 | 
				
			||||||
 | 
								klog.Infof("Zero out resource %s capacity in existing node.", k)
 | 
				
			||||||
			node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
 | 
								node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
 | 
				
			||||||
			node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
 | 
								node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
 | 
				
			||||||
			requiresUpdate = true
 | 
								requiresUpdate = true
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user