Merge pull request #133203 from pacoxu/revert-133167-preemption-conor-case

Revert "fix: handle corner cases in the async preemption"
This commit is contained in:
Kubernetes Prow Robot
2025-07-24 22:18:26 -07:00
committed by GitHub
2 changed files with 33 additions and 126 deletions

View File

@@ -190,8 +190,10 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
if !apierrors.IsNotFound(err) {
logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
return err
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
return nil
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
}
@@ -433,18 +435,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
logger := klog.FromContext(ctx)
errCh := parallelize.NewErrorChannel()
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
victimPod := c.Victims().Pods[index]
if victimPod.DeletionTimestamp != nil {
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod))
return
}
if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil {
if apierrors.IsNotFound(err) {
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "victim", klog.KObj(victimPod), "node", c.Name())
return
}
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil {
errCh.SendErrorWithCancel(err, cancel)
}
}, ev.PluginName)
@@ -480,24 +471,9 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx,
// because this process could continue even after this scheduling cycle finishes.
ctx, cancel := context.WithCancel(context.Background())
logger := klog.FromContext(ctx)
victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods))
for _, victim := range c.Victims().Pods {
if victim.DeletionTimestamp != nil {
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim))
continue
}
victimPods = append(victimPods, victim)
}
if len(victimPods) == 0 {
cancel()
return
}
errCh := parallelize.NewErrorChannel()
preemptPod := func(index int) {
victim := victimPods[index]
victim := c.Victims().Pods[index]
if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil {
errCh.SendErrorWithCancel(err, cancel)
}
@@ -507,26 +483,21 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
ev.preempting.Insert(pod.UID)
ev.mu.Unlock()
logger := klog.FromContext(ctx)
go func() {
startTime := time.Now()
result := metrics.GoroutineResultSuccess
// Whether all victim pods are already deleted before making API call.
allPodsAlreadyDeleted := true
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
defer func() {
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
// So, we should move the Pod to the activeQ.
if result == metrics.GoroutineResultError ||
// When all pods are already deleted (which is very rare, but could happen in theory),
// it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod.
allPodsAlreadyDeleted {
if result == metrics.GoroutineResultError {
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
// So, we should move the Pod to the activeQ.
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
}
}()
defer cancel()
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods))
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
@@ -539,39 +510,33 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// We do not return as this error is not critical.
}
if len(victimPods) > 1 {
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName)
err := errCh.ReceiveError()
switch {
case apierrors.IsNotFound(err):
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "node", c.Name(), "err", err)
case err != nil:
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
default:
allPodsAlreadyDeleted = false
}
if len(c.Victims().Pods) == 0 {
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
return
}
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName)
switch {
case apierrors.IsNotFound(err):
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "node", c.Name(), "err", err)
case err != nil:
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
default:
allPodsAlreadyDeleted = false
}
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)

View File

@@ -30,8 +30,6 @@ import (
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@@ -418,11 +416,6 @@ func TestPrepareCandidate(t *testing.T) {
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
failVictim = st.MakePod().Name("fail-victim").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
@@ -451,12 +444,6 @@ func TestPrepareCandidate(t *testing.T) {
Obj()
)
victimWithDeletionTimestamp := victim1.DeepCopy()
victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp"
victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp"
victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)}
victimWithDeletionTimestamp.Finalizers = []string{"test"}
tests := []struct {
name string
nodeNames []string
@@ -485,8 +472,9 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim without condition",
@@ -508,42 +496,6 @@ func TestPrepareCandidate(t *testing.T) {
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, but victim is already being deleted",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victimWithDeletionTimestamp,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{
victimWithDeletionTimestamp,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
},
{
name: "one victim, but victim is already deleted",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
notFoundVictim1,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: nil,
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim with same condition",
@@ -703,11 +655,6 @@ func TestPrepareCandidate(t *testing.T) {
deletionFailure = true
return true, nil, fmt.Errorf("delete pod failed")
}
// fake clientset does not return an error for not-found pods, so we simulate it here.
if name == "not-found-victim" {
// Simulate a not-found error.
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name)
}
deletedPods.Insert(name)
return true, nil, nil
@@ -720,10 +667,6 @@ func TestPrepareCandidate(t *testing.T) {
patchFailure = true
return true, nil, fmt.Errorf("patch pod status failed")
}
// fake clientset does not return an error for not-found pods, so we simulate it here.
if action.(clienttesting.PatchAction).GetName() == "not-found-victim" {
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim")
}
return true, nil, nil
})
@@ -854,8 +797,7 @@ func TestPrepareCandidate(t *testing.T) {
return true, nil
}); err != nil {
t.Error(lastErrMsg)
t.Error(err)
t.Fatal(lastErrMsg)
}
})
}