diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 29288ca411a..2476c83e333 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -190,8 +190,10 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { if !apierrors.IsNotFound(err) { logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) + return err } - return err + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) + return nil } logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) } @@ -433,18 +435,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. logger := klog.FromContext(ctx) errCh := parallelize.NewErrorChannel() fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) { - victimPod := c.Victims().Pods[index] - if victimPod.DeletionTimestamp != nil { - // If the victim Pod is already being deleted, we don't have to make another deletion api call. - logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod)) - return - } - - if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil { - if apierrors.IsNotFound(err) { - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "victim", klog.KObj(victimPod), "node", c.Name()) - return - } + if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil { errCh.SendErrorWithCancel(err, cancel) } }, ev.PluginName) @@ -480,24 +471,9 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx, // because this process could continue even after this scheduling cycle finishes. ctx, cancel := context.WithCancel(context.Background()) - logger := klog.FromContext(ctx) - victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods)) - for _, victim := range c.Victims().Pods { - if victim.DeletionTimestamp != nil { - // If the victim Pod is already being deleted, we don't have to make another deletion api call. - logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim)) - continue - } - victimPods = append(victimPods, victim) - } - if len(victimPods) == 0 { - cancel() - return - } - errCh := parallelize.NewErrorChannel() preemptPod := func(index int) { - victim := victimPods[index] + victim := c.Victims().Pods[index] if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil { errCh.SendErrorWithCancel(err, cancel) } @@ -507,26 +483,21 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName ev.preempting.Insert(pod.UID) ev.mu.Unlock() + logger := klog.FromContext(ctx) go func() { startTime := time.Now() result := metrics.GoroutineResultSuccess - - // Whether all victim pods are already deleted before making API call. - allPodsAlreadyDeleted := true defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() defer func() { - // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. - // So, we should move the Pod to the activeQ. - if result == metrics.GoroutineResultError || - // When all pods are already deleted (which is very rare, but could happen in theory), - // it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod. - allPodsAlreadyDeleted { + if result == metrics.GoroutineResultError { + // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. + // So, we should move the Pod to the activeQ. ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) } }() defer cancel() - logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods)) + logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their @@ -539,39 +510,33 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // We do not return as this error is not critical. } - if len(victimPods) > 1 { - // We can evict all victims in parallel, but the last one. - // We have to remove the pod from the preempting map before the last one is evicted - // because, otherwise, the pod removal might be notified to the scheduling queue before - // we remove this pod from the preempting map, - // and the pod could end up stucking at the unschedulable pod pool - // by all the pod removal events being ignored. - ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName) - err := errCh.ReceiveError() - switch { - case apierrors.IsNotFound(err): - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "node", c.Name(), "err", err) - case err != nil: - utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") - result = metrics.GoroutineResultError - default: - allPodsAlreadyDeleted = false - } + if len(c.Victims().Pods) == 0 { + ev.mu.Lock() + delete(ev.preempting, pod.UID) + ev.mu.Unlock() + + return + } + + // We can evict all victims in parallel, but the last one. + // We have to remove the pod from the preempting map before the last one is evicted + // because, otherwise, the pod removal might be notified to the scheduling queue before + // we remove this pod from the preempting map, + // and the pod could end up stucking at the unschedulable pod pool + // by all the pod removal events being ignored. + ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) + if err := errCh.ReceiveError(); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") + result = metrics.GoroutineResultError } ev.mu.Lock() delete(ev.preempting, pod.UID) ev.mu.Unlock() - err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName) - switch { - case apierrors.IsNotFound(err): - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "node", c.Name(), "err", err) - case err != nil: + if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil { utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") result = metrics.GoroutineResultError - default: - allPodsAlreadyDeleted = false } logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index e6da91cdcd3..3c6bfc980a1 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -30,8 +30,6 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -418,11 +416,6 @@ func TestPrepareCandidate(t *testing.T) { Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). Obj() - notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1"). - Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). - Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). - Obj() - failVictim = st.MakePod().Name("fail-victim").UID("victim1"). Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). @@ -451,12 +444,6 @@ func TestPrepareCandidate(t *testing.T) { Obj() ) - victimWithDeletionTimestamp := victim1.DeepCopy() - victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp" - victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp" - victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)} - victimWithDeletionTimestamp.Finalizers = []string{"test"} - tests := []struct { name string nodeNames []string @@ -485,8 +472,9 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { name: "one victim without condition", @@ -508,42 +496,6 @@ func TestPrepareCandidate(t *testing.T) { expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), }, - { - name: "one victim, but victim is already being deleted", - - candidate: &fakeCandidate{ - name: node1Name, - victims: &extenderv1.Victims{ - Pods: []*v1.Pod{ - victimWithDeletionTimestamp, - }, - }, - }, - preemptor: preemptor, - testPods: []*v1.Pod{ - victimWithDeletionTimestamp, - }, - nodeNames: []string{node1Name}, - expectedStatus: nil, - }, - { - name: "one victim, but victim is already deleted", - - candidate: &fakeCandidate{ - name: node1Name, - victims: &extenderv1.Victims{ - Pods: []*v1.Pod{ - notFoundVictim1, - }, - }, - }, - preemptor: preemptor, - testPods: []*v1.Pod{}, - nodeNames: []string{node1Name}, - expectedStatus: nil, - expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, - expectedPreemptingMap: sets.New(types.UID("preemptor")), - }, { name: "one victim with same condition", @@ -703,11 +655,6 @@ func TestPrepareCandidate(t *testing.T) { deletionFailure = true return true, nil, fmt.Errorf("delete pod failed") } - // fake clientset does not return an error for not-found pods, so we simulate it here. - if name == "not-found-victim" { - // Simulate a not-found error. - return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name) - } deletedPods.Insert(name) return true, nil, nil @@ -720,10 +667,6 @@ func TestPrepareCandidate(t *testing.T) { patchFailure = true return true, nil, fmt.Errorf("patch pod status failed") } - // fake clientset does not return an error for not-found pods, so we simulate it here. - if action.(clienttesting.PatchAction).GetName() == "not-found-victim" { - return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim") - } return true, nil, nil }) @@ -854,8 +797,7 @@ func TestPrepareCandidate(t *testing.T) { return true, nil }); err != nil { - t.Error(lastErrMsg) - t.Error(err) + t.Fatal(lastErrMsg) } }) }