mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Ensure deletion of pods in queues and cache
When the client misses a delete event from the watcher, it will use the last state of the pod in the informer cache to produce a delete event. At that point, it's not clear if the pod was in the queues or the cache, so we should issue a deletion in both. The pod could be assumed, so deletion of assumed pods from the cache should work. Change-Id: I11ce9785de603924fc121fe2fa6ed5cb1e16922f
This commit is contained in:
		@@ -270,8 +270,10 @@ func addAllEventHandlers(
 | 
				
			|||||||
				case *v1.Pod:
 | 
									case *v1.Pod:
 | 
				
			||||||
					return assignedPod(t)
 | 
										return assignedPod(t)
 | 
				
			||||||
				case cache.DeletedFinalStateUnknown:
 | 
									case cache.DeletedFinalStateUnknown:
 | 
				
			||||||
					if pod, ok := t.Obj.(*v1.Pod); ok {
 | 
										if _, ok := t.Obj.(*v1.Pod); ok {
 | 
				
			||||||
						return assignedPod(pod)
 | 
											// The carried object may be stale, so we don't use it to check if
 | 
				
			||||||
 | 
											// it's assigned or not. Attempting to cleanup anyways.
 | 
				
			||||||
 | 
											return true
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
 | 
										utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
 | 
				
			||||||
					return false
 | 
										return false
 | 
				
			||||||
@@ -296,7 +298,9 @@ func addAllEventHandlers(
 | 
				
			|||||||
					return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
 | 
										return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
 | 
				
			||||||
				case cache.DeletedFinalStateUnknown:
 | 
									case cache.DeletedFinalStateUnknown:
 | 
				
			||||||
					if pod, ok := t.Obj.(*v1.Pod); ok {
 | 
										if pod, ok := t.Obj.(*v1.Pod); ok {
 | 
				
			||||||
						return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
 | 
											// The carried object may be stale, so we don't use it to check if
 | 
				
			||||||
 | 
											// it's assigned or not.
 | 
				
			||||||
 | 
											return responsibleForPod(pod, sched.Profiles)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
 | 
										utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
 | 
				
			||||||
					return false
 | 
										return false
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										17
									
								
								pkg/scheduler/internal/cache/cache.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										17
									
								
								pkg/scheduler/internal/cache/cache.go
									
									
									
									
										vendored
									
									
								
							@@ -537,22 +537,19 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	currState, ok := cache.podStates[key]
 | 
						currState, ok := cache.podStates[key]
 | 
				
			||||||
	switch {
 | 
						switch {
 | 
				
			||||||
	// An assumed pod won't have Delete/Remove event. It needs to have Add event
 | 
						case ok:
 | 
				
			||||||
	// before Remove event, in which case the state would change from Assumed to Added.
 | 
					 | 
				
			||||||
	case ok && !cache.assumedPods.Has(key):
 | 
					 | 
				
			||||||
		if currState.pod.Spec.NodeName != pod.Spec.NodeName {
 | 
							if currState.pod.Spec.NodeName != pod.Spec.NodeName {
 | 
				
			||||||
			klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
 | 
								klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
 | 
				
			||||||
			klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
 | 
								if pod.Spec.NodeName != "" {
 | 
				
			||||||
 | 
									// An empty NodeName is possible when the scheduler misses a Delete
 | 
				
			||||||
 | 
									// event and it gets the last known state from the informer cache.
 | 
				
			||||||
 | 
									klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		err := cache.removePod(currState.pod)
 | 
							return cache.expirePod(key, currState)
 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		delete(cache.podStates, key)
 | 
					 | 
				
			||||||
	default:
 | 
						default:
 | 
				
			||||||
		return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
 | 
							return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
 | 
					func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										81
									
								
								pkg/scheduler/internal/cache/cache_test.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										81
									
								
								pkg/scheduler/internal/cache/cache_test.go
									
									
									
									
										vendored
									
									
								
							@@ -807,57 +807,62 @@ func TestEphemeralStorageResource(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// TestRemovePod tests after added pod is removed, its information should also be subtracted.
 | 
					// TestRemovePod tests after added pod is removed, its information should also be subtracted.
 | 
				
			||||||
func TestRemovePod(t *testing.T) {
 | 
					func TestRemovePod(t *testing.T) {
 | 
				
			||||||
	basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
 | 
						pod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
 | 
				
			||||||
	tests := []struct {
 | 
						nodes := []*v1.Node{
 | 
				
			||||||
		nodes     []*v1.Node
 | 
							{
 | 
				
			||||||
		pod       *v1.Pod
 | 
								ObjectMeta: metav1.ObjectMeta{Name: "node-1"},
 | 
				
			||||||
		wNodeInfo *framework.NodeInfo
 | 
					 | 
				
			||||||
	}{{
 | 
					 | 
				
			||||||
		nodes: []*v1.Node{
 | 
					 | 
				
			||||||
			{
 | 
					 | 
				
			||||||
				ObjectMeta: metav1.ObjectMeta{Name: "node-1"},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
			{
 | 
					 | 
				
			||||||
				ObjectMeta: metav1.ObjectMeta{Name: "node-2"},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		pod: basePod,
 | 
							{
 | 
				
			||||||
		wNodeInfo: newNodeInfo(
 | 
								ObjectMeta: metav1.ObjectMeta{Name: "node-2"},
 | 
				
			||||||
			&framework.Resource{
 | 
							},
 | 
				
			||||||
				MilliCPU: 100,
 | 
						}
 | 
				
			||||||
				Memory:   500,
 | 
						wNodeInfo := newNodeInfo(
 | 
				
			||||||
			},
 | 
							&framework.Resource{
 | 
				
			||||||
			&framework.Resource{
 | 
								MilliCPU: 100,
 | 
				
			||||||
				MilliCPU: 100,
 | 
								Memory:   500,
 | 
				
			||||||
				Memory:   500,
 | 
							},
 | 
				
			||||||
			},
 | 
							&framework.Resource{
 | 
				
			||||||
			[]*v1.Pod{basePod},
 | 
								MilliCPU: 100,
 | 
				
			||||||
			newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
 | 
								Memory:   500,
 | 
				
			||||||
			make(map[string]*framework.ImageStateSummary),
 | 
							},
 | 
				
			||||||
		),
 | 
							[]*v1.Pod{pod},
 | 
				
			||||||
	}}
 | 
							newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
 | 
				
			||||||
 | 
							make(map[string]*framework.ImageStateSummary),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						tests := map[string]struct {
 | 
				
			||||||
 | 
							assume bool
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							"bound":   {},
 | 
				
			||||||
 | 
							"assumed": {assume: true},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for i, tt := range tests {
 | 
						for name, tt := range tests {
 | 
				
			||||||
		t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
 | 
							t.Run(name, func(t *testing.T) {
 | 
				
			||||||
			nodeName := tt.pod.Spec.NodeName
 | 
								nodeName := pod.Spec.NodeName
 | 
				
			||||||
			cache := newSchedulerCache(time.Second, time.Second, nil)
 | 
								cache := newSchedulerCache(time.Second, time.Second, nil)
 | 
				
			||||||
			// Add pod succeeds even before adding the nodes.
 | 
								// Add/Assume pod succeeds even before adding the nodes.
 | 
				
			||||||
			if err := cache.AddPod(tt.pod); err != nil {
 | 
								if tt.assume {
 | 
				
			||||||
				t.Fatalf("AddPod failed: %v", err)
 | 
									if err := cache.AddPod(pod); err != nil {
 | 
				
			||||||
 | 
										t.Fatalf("AddPod failed: %v", err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									if err := cache.AssumePod(pod); err != nil {
 | 
				
			||||||
 | 
										t.Fatalf("AssumePod failed: %v", err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			n := cache.nodes[nodeName]
 | 
								n := cache.nodes[nodeName]
 | 
				
			||||||
			if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil {
 | 
								if err := deepEqualWithoutGeneration(n, wNodeInfo); err != nil {
 | 
				
			||||||
				t.Error(err)
 | 
									t.Error(err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			for _, n := range tt.nodes {
 | 
								for _, n := range nodes {
 | 
				
			||||||
				cache.AddNode(n)
 | 
									cache.AddNode(n)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if err := cache.RemovePod(tt.pod); err != nil {
 | 
								if err := cache.RemovePod(pod); err != nil {
 | 
				
			||||||
				t.Fatalf("RemovePod failed: %v", err)
 | 
									t.Fatalf("RemovePod failed: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if _, err := cache.GetPod(tt.pod); err == nil {
 | 
								if _, err := cache.GetPod(pod); err == nil {
 | 
				
			||||||
				t.Errorf("pod was not deleted")
 | 
									t.Errorf("pod was not deleted")
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user