mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Merge pull request #41870 from intelsdi-x/test-out-of-oir
Automatic merge from submit-queue (batch tested with PRs 31783, 41988, 42535, 42572, 41870) Pods pending due to insufficient OIR should get scheduled once sufficient OIR becomes available. This appears to be a regression since v1.5.0 in scheduler behavior for opaque integer resources, reported in https://github.com/kubernetes/kubernetes/issues/41861. - [X] Add failing e2e test to trigger the regression - [x] Restore previous behavior (pods pending due to insufficient OIR get scheduled once sufficient OIR becomes available.)
This commit is contained in:
		| @@ -529,6 +529,14 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { | ||||
| 	if node.Status.Allocatable == nil { | ||||
| 		node.Status.Allocatable = make(v1.ResourceList) | ||||
| 	} | ||||
| 	// Remove opaque integer resources from allocatable that are no longer | ||||
| 	// present in capacity. | ||||
| 	for k := range node.Status.Allocatable { | ||||
| 		_, found := node.Status.Capacity[k] | ||||
| 		if !found && v1.IsOpaqueIntResourceName(k) { | ||||
| 			delete(node.Status.Allocatable, k) | ||||
| 		} | ||||
| 	} | ||||
| 	allocatableReservation := kl.containerManager.GetNodeAllocatableReservation() | ||||
| 	for k, v := range node.Status.Capacity { | ||||
| 		value := *(v.Copy()) | ||||
|   | ||||
| @@ -468,6 +468,30 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *s | ||||
| 	return true, nil, nil | ||||
| } | ||||
|  | ||||
| // Returns a *schedulercache.Resource that covers the largest width in each | ||||
| // resource dimension. Because init-containers run sequentially, we collect the | ||||
| // max in each dimension iteratively. In contrast, we sum the resource vectors | ||||
| // for regular containers since they run simultaneously. | ||||
| // | ||||
| // Example: | ||||
| // | ||||
| // Pod: | ||||
| //   InitContainers | ||||
| //     IC1: | ||||
| //       CPU: 2 | ||||
| //       Memory: 1G | ||||
| //     IC2: | ||||
| //       CPU: 2 | ||||
| //       Memory: 3G | ||||
| //   Containers | ||||
| //     C1: | ||||
| //       CPU: 2 | ||||
| //       Memory: 1G | ||||
| //     C2: | ||||
| //       CPU: 1 | ||||
| //       Memory: 1G | ||||
| // | ||||
| // Result: CPU: 3, Memory: 3G | ||||
| func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { | ||||
| 	result := schedulercache.Resource{} | ||||
| 	for _, container := range pod.Spec.Containers { | ||||
| @@ -505,10 +529,8 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { | ||||
| 			default: | ||||
| 				if v1.IsOpaqueIntResourceName(rName) { | ||||
| 					value := rQuantity.Value() | ||||
| 					// Ensure the opaque resource map is initialized in the result. | ||||
| 					result.AddOpaque(rName, int64(0)) | ||||
| 					if value > result.OpaqueIntResources[rName] { | ||||
| 						result.OpaqueIntResources[rName] = value | ||||
| 						result.SetOpaque(rName, value) | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
|   | ||||
| @@ -83,11 +83,15 @@ func (r *Resource) ResourceList() v1.ResourceList { | ||||
| } | ||||
|  | ||||
| func (r *Resource) AddOpaque(name v1.ResourceName, quantity int64) { | ||||
| 	r.SetOpaque(name, r.OpaqueIntResources[name]+quantity) | ||||
| } | ||||
|  | ||||
| func (r *Resource) SetOpaque(name v1.ResourceName, quantity int64) { | ||||
| 	// Lazily allocate opaque integer resource map. | ||||
| 	if r.OpaqueIntResources == nil { | ||||
| 		r.OpaqueIntResources = map[v1.ResourceName]int64{} | ||||
| 	} | ||||
| 	r.OpaqueIntResources[name] += quantity | ||||
| 	r.OpaqueIntResources[name] = quantity | ||||
| } | ||||
|  | ||||
| // NewNodeInfo returns a ready to use empty NodeInfo object. | ||||
| @@ -333,7 +337,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { | ||||
| 			n.allowedPodNumber = int(rQuant.Value()) | ||||
| 		default: | ||||
| 			if v1.IsOpaqueIntResourceName(rName) { | ||||
| 				n.allocatableResource.AddOpaque(rName, rQuant.Value()) | ||||
| 				n.allocatableResource.SetOpaque(rName, rQuant.Value()) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|   | ||||
| @@ -38,7 +38,7 @@ import ( | ||||
| 	. "github.com/onsi/gomega" | ||||
| ) | ||||
|  | ||||
| var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", func() { | ||||
| var _ = framework.KubeDescribe("Opaque resources", func() { | ||||
| 	f := framework.NewDefaultFramework("opaque-resource") | ||||
| 	opaqueResName := v1.OpaqueIntResourceName("foo") | ||||
| 	var node *v1.Node | ||||
| @@ -59,11 +59,19 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		removeOpaqueResource(f, node.Name, opaqueResName) | ||||
| 		addOpaqueResource(f, node.Name, opaqueResName) | ||||
| 	}) | ||||
|  | ||||
| 	// TODO: The suite times out if removeOpaqueResource is called as part of | ||||
| 	//       an AfterEach closure. For now, it is the last statement in each | ||||
| 	//       It block. | ||||
| 	// AfterEach(func() { | ||||
| 	// 	removeOpaqueResource(f, node.Name, opaqueResName) | ||||
| 	// }) | ||||
|  | ||||
| 	It("should not break pods that do not consume opaque integer resources.", func() { | ||||
| 		defer removeOpaqueResource(f, node.Name, opaqueResName) | ||||
|  | ||||
| 		By("Creating a vanilla pod") | ||||
| 		requests := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.1")} | ||||
| 		limits := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.2")} | ||||
| @@ -74,19 +82,17 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun | ||||
| 			_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) | ||||
| 			return err | ||||
| 		} | ||||
| 		predicate := func(e *v1.Event) bool { | ||||
| 			return e.Type == v1.EventTypeNormal && | ||||
| 				e.Reason == "Scheduled" && | ||||
| 		// Here we don't check for the bound node name since it can land on | ||||
| 		// any one (this pod doesn't require any of the opaque resource.) | ||||
| 				strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v", pod.Name)) | ||||
| 		} | ||||
| 		predicate := scheduleSuccess(pod.Name, "") | ||||
| 		success, err := observeEventAfterAction(f, predicate, action) | ||||
| 		Expect(err).NotTo(HaveOccurred()) | ||||
| 		Expect(success).To(Equal(true)) | ||||
| 	}) | ||||
|  | ||||
| 	It("should schedule pods that do consume opaque integer resources.", func() { | ||||
| 		defer removeOpaqueResource(f, node.Name, opaqueResName) | ||||
|  | ||||
| 		By("Creating a pod that requires less of the opaque resource than is allocatable on a node.") | ||||
| 		requests := v1.ResourceList{ | ||||
| 			v1.ResourceCPU: resource.MustParse("0.1"), | ||||
| @@ -103,17 +109,15 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun | ||||
| 			_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) | ||||
| 			return err | ||||
| 		} | ||||
| 		predicate := func(e *v1.Event) bool { | ||||
| 			return e.Type == v1.EventTypeNormal && | ||||
| 				e.Reason == "Scheduled" && | ||||
| 				strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name)) | ||||
| 		} | ||||
| 		predicate := scheduleSuccess(pod.Name, node.Name) | ||||
| 		success, err := observeEventAfterAction(f, predicate, action) | ||||
| 		Expect(err).NotTo(HaveOccurred()) | ||||
| 		Expect(success).To(Equal(true)) | ||||
| 	}) | ||||
|  | ||||
| 	It("should not schedule pods that exceed the available amount of opaque integer resource.", func() { | ||||
| 		defer removeOpaqueResource(f, node.Name, opaqueResName) | ||||
|  | ||||
| 		By("Creating a pod that requires more of the opaque resource than is allocatable on any node") | ||||
| 		requests := v1.ResourceList{opaqueResName: resource.MustParse("6")} | ||||
| 		limits := v1.ResourceList{} | ||||
| @@ -123,17 +127,15 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun | ||||
| 			_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(newTestPod(f, "over-max-oir", requests, limits)) | ||||
| 			return err | ||||
| 		} | ||||
| 		predicate := func(e *v1.Event) bool { | ||||
| 			return e.Type == "Warning" && | ||||
| 				e.Reason == "FailedScheduling" && | ||||
| 				strings.Contains(e.Message, "failed to fit in any node") | ||||
| 		} | ||||
| 		predicate := scheduleFailure("over-max-oir") | ||||
| 		success, err := observeEventAfterAction(f, predicate, action) | ||||
| 		Expect(err).NotTo(HaveOccurred()) | ||||
| 		Expect(success).To(Equal(true)) | ||||
| 	}) | ||||
|  | ||||
| 	It("should account opaque integer resources in pods with multiple containers.", func() { | ||||
| 		defer removeOpaqueResource(f, node.Name, opaqueResName) | ||||
|  | ||||
| 		By("Creating a pod with two containers that together require less of the opaque resource than is allocatable on a node") | ||||
| 		requests := v1.ResourceList{opaqueResName: resource.MustParse("1")} | ||||
| 		limits := v1.ResourceList{} | ||||
| @@ -170,11 +172,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun | ||||
| 			_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) | ||||
| 			return err | ||||
| 		} | ||||
| 		predicate := func(e *v1.Event) bool { | ||||
| 			return e.Type == v1.EventTypeNormal && | ||||
| 				e.Reason == "Scheduled" && | ||||
| 				strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name)) | ||||
| 		} | ||||
| 		predicate := scheduleSuccess(pod.Name, node.Name) | ||||
| 		success, err := observeEventAfterAction(f, predicate, action) | ||||
| 		Expect(err).NotTo(HaveOccurred()) | ||||
| 		Expect(success).To(Equal(true)) | ||||
| @@ -214,11 +212,53 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun | ||||
| 			_, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) | ||||
| 			return err | ||||
| 		} | ||||
| 		predicate = func(e *v1.Event) bool { | ||||
| 			return e.Type == "Warning" && | ||||
| 				e.Reason == "FailedScheduling" && | ||||
| 				strings.Contains(e.Message, "failed to fit in any node") | ||||
| 		predicate = scheduleFailure(pod.Name) | ||||
| 		success, err = observeEventAfterAction(f, predicate, action) | ||||
| 		Expect(err).NotTo(HaveOccurred()) | ||||
| 		Expect(success).To(Equal(true)) | ||||
| 	}) | ||||
|  | ||||
| 	It("should schedule pods that initially do not fit after enough opaque integer resources are freed.", func() { | ||||
| 		defer removeOpaqueResource(f, node.Name, opaqueResName) | ||||
|  | ||||
| 		By("Creating a pod that requires less of the opaque resource than is allocatable on a node.") | ||||
| 		requests := v1.ResourceList{ | ||||
| 			v1.ResourceCPU: resource.MustParse("0.1"), | ||||
| 			opaqueResName:  resource.MustParse("3"), | ||||
| 		} | ||||
| 		limits := v1.ResourceList{ | ||||
| 			v1.ResourceCPU: resource.MustParse("0.2"), | ||||
| 			opaqueResName:  resource.MustParse("3"), | ||||
| 		} | ||||
| 		pod1 := newTestPod(f, "oir-1", requests, limits) | ||||
| 		pod2 := newTestPod(f, "oir-2", requests, limits) | ||||
|  | ||||
| 		By("Observing an event that indicates one pod was scheduled") | ||||
| 		action := func() error { | ||||
| 			_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod1) | ||||
| 			return err | ||||
| 		} | ||||
| 		predicate := scheduleSuccess(pod1.Name, node.Name) | ||||
| 		success, err := observeEventAfterAction(f, predicate, action) | ||||
| 		Expect(err).NotTo(HaveOccurred()) | ||||
| 		Expect(success).To(Equal(true)) | ||||
|  | ||||
| 		By("Observing an event that indicates a subsequent pod was not scheduled") | ||||
| 		action = func() error { | ||||
| 			_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod2) | ||||
| 			return err | ||||
| 		} | ||||
| 		predicate = scheduleFailure(pod2.Name) | ||||
| 		success, err = observeEventAfterAction(f, predicate, action) | ||||
| 		Expect(err).NotTo(HaveOccurred()) | ||||
| 		Expect(success).To(Equal(true)) | ||||
|  | ||||
| 		By("Observing an event that indicates the second pod was scheduled after deleting the first pod") | ||||
| 		action = func() error { | ||||
| 			err := f.ClientSet.Core().Pods(f.Namespace.Name).Delete(pod1.Name, nil) | ||||
| 			return err | ||||
| 		} | ||||
| 		predicate = scheduleSuccess(pod2.Name, node.Name) | ||||
| 		success, err = observeEventAfterAction(f, predicate, action) | ||||
| 		Expect(err).NotTo(HaveOccurred()) | ||||
| 		Expect(success).To(Equal(true)) | ||||
| @@ -228,12 +268,14 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun | ||||
| // Adds the opaque resource to a node. | ||||
| func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) { | ||||
| 	action := func() error { | ||||
| 		By(fmt.Sprintf("Adding OIR to node [%s]", nodeName)) | ||||
| 		patch := []byte(fmt.Sprintf(`[{"op": "add", "path": "/status/capacity/%s", "value": "5"}]`, escapeForJSONPatch(opaqueResName))) | ||||
| 		return f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do().Error() | ||||
| 	} | ||||
| 	predicate := func(n *v1.Node) bool { | ||||
| 		capacity, foundCap := n.Status.Capacity[opaqueResName] | ||||
| 		allocatable, foundAlloc := n.Status.Allocatable[opaqueResName] | ||||
| 		By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String())) | ||||
| 		return foundCap && capacity.MilliValue() == int64(5000) && | ||||
| 			foundAlloc && allocatable.MilliValue() == int64(5000) | ||||
| 	} | ||||
| @@ -245,14 +287,16 @@ func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1 | ||||
| // Removes the opaque resource from a node. | ||||
| func removeOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) { | ||||
| 	action := func() error { | ||||
| 		By(fmt.Sprintf("Removing OIR from node [%s]", nodeName)) | ||||
| 		patch := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/status/capacity/%s"}]`, escapeForJSONPatch(opaqueResName))) | ||||
| 		f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do() | ||||
| 		return nil // Ignore error -- the opaque resource may not exist. | ||||
| 	} | ||||
| 	predicate := func(n *v1.Node) bool { | ||||
| 		_, foundCap := n.Status.Capacity[opaqueResName] | ||||
| 		_, foundAlloc := n.Status.Allocatable[opaqueResName] | ||||
| 		return !foundCap && !foundAlloc | ||||
| 		capacity, foundCap := n.Status.Capacity[opaqueResName] | ||||
| 		allocatable, foundAlloc := n.Status.Allocatable[opaqueResName] | ||||
| 		By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String())) | ||||
| 		return (!foundCap || capacity.IsZero()) && (!foundAlloc || allocatable.IsZero()) | ||||
| 	} | ||||
| 	success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action) | ||||
| 	Expect(err).NotTo(HaveOccurred()) | ||||
| @@ -345,7 +389,7 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve | ||||
| 		cache.ResourceEventHandlerFuncs{ | ||||
| 			AddFunc: func(obj interface{}) { | ||||
| 				e, ok := obj.(*v1.Event) | ||||
| 				By(fmt.Sprintf("Considering event: \nType = [%s], Reason = [%s], Message = [%s]", e.Type, e.Reason, e.Message)) | ||||
| 				By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message)) | ||||
| 				Expect(ok).To(Equal(true)) | ||||
| 				if ok && eventPredicate(e) { | ||||
| 					observedMatchingEvent = true | ||||
| @@ -373,3 +417,20 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve | ||||
| 	}) | ||||
| 	return err == nil, err | ||||
| } | ||||
|  | ||||
| func scheduleSuccess(podName, nodeName string) func(*v1.Event) bool { | ||||
| 	return func(e *v1.Event) bool { | ||||
| 		return e.Type == v1.EventTypeNormal && | ||||
| 			e.Reason == "Scheduled" && | ||||
| 			strings.HasPrefix(e.Name, podName) && | ||||
| 			strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", podName, nodeName)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func scheduleFailure(podName string) func(*v1.Event) bool { | ||||
| 	return func(e *v1.Event) bool { | ||||
| 		return strings.HasPrefix(e.Name, podName) && | ||||
| 			e.Type == "Warning" && | ||||
| 			e.Reason == "FailedScheduling" | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue