mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #37604 from ymqytw/fix_issues_with_drain
Automatic merge from submit-queue make drain retry forever and use a new graceful period Implemented the 1st approach according to https://github.com/kubernetes/kubernetes/issues/37460#issuecomment-263437516 1) Make drain retry forever if the error is always Too Many Requests (429) generated by Pod Disruption Budget. 2) Use a new graceful period per #37460 3) Update the message printed out when successfully deleting or evicting a pod. fixes #37460 cc: @davidopp @erictune
This commit is contained in:
		@@ -185,7 +185,7 @@ func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
 | 
			
		||||
	cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.")
 | 
			
		||||
	cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
 | 
			
		||||
	cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
 | 
			
		||||
	cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up on a delete, zero means determine a timeout from the size of the object")
 | 
			
		||||
	cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up, zero means infinite")
 | 
			
		||||
	return cmd
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -238,16 +238,6 @@ func (o *DrainOptions) RunDrain() error {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err := o.deleteOrEvictPodsSimple()
 | 
			
		||||
	// TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field
 | 
			
		||||
	for i := 1; i <= maxPatchRetry && apierrors.IsTooManyRequests(err); i++ {
 | 
			
		||||
		if i > triesBeforeBackOff {
 | 
			
		||||
			currBackOffPeriod := time.Duration(math.Exp2(float64(i-triesBeforeBackOff))) * backOffPeriod
 | 
			
		||||
			fmt.Fprintf(o.errOut, "Retry in %v\n", currBackOffPeriod)
 | 
			
		||||
			o.backOff.Sleep(currBackOffPeriod)
 | 
			
		||||
		}
 | 
			
		||||
		fmt.Fprintf(o.errOut, "Retrying\n")
 | 
			
		||||
		err = o.deleteOrEvictPodsSimple()
 | 
			
		||||
	}
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained")
 | 
			
		||||
	}
 | 
			
		||||
@@ -259,9 +249,7 @@ func (o *DrainOptions) deleteOrEvictPodsSimple() error {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if o.Timeout == 0 {
 | 
			
		||||
		o.Timeout = kubectl.Timeout + time.Duration(10*len(pods))*time.Second
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = o.deleteOrEvictPods(pods)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		pendingPods, newErr := o.getPodsForDeletion()
 | 
			
		||||
@@ -470,31 +458,99 @@ func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	getPodFn := func(namespace, name string) (*api.Pod, error) {
 | 
			
		||||
		return o.client.Core().Pods(namespace).Get(name)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(policyGroupVersion) > 0 {
 | 
			
		||||
		return o.evictPods(pods, policyGroupVersion, getPodFn)
 | 
			
		||||
	} else {
 | 
			
		||||
		return o.deletePods(pods, getPodFn)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (o *DrainOptions) evictPods(pods []api.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*api.Pod, error)) error {
 | 
			
		||||
	doneCh := make(chan bool, len(pods))
 | 
			
		||||
	errCh := make(chan error, 1)
 | 
			
		||||
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		if len(policyGroupVersion) > 0 {
 | 
			
		||||
			err = o.evictPod(pod, policyGroupVersion)
 | 
			
		||||
		} else {
 | 
			
		||||
			err = o.deletePod(pod)
 | 
			
		||||
		go func(pod api.Pod, doneCh chan bool, errCh chan error) {
 | 
			
		||||
			var err error
 | 
			
		||||
			for {
 | 
			
		||||
				err = o.evictPod(pod, policyGroupVersion)
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					break
 | 
			
		||||
				} else if apierrors.IsTooManyRequests(err) {
 | 
			
		||||
					time.Sleep(5 * time.Second)
 | 
			
		||||
				} else {
 | 
			
		||||
					errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			podArray := []api.Pod{pod}
 | 
			
		||||
			_, err = o.waitForDelete(podArray, kubectl.Interval, time.Duration(math.MaxInt64), true, getPodFn)
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				doneCh <- true
 | 
			
		||||
			} else {
 | 
			
		||||
				errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
 | 
			
		||||
			}
 | 
			
		||||
		}(pod, doneCh, errCh)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	doneCount := 0
 | 
			
		||||
	// 0 timeout means infinite, we use MaxInt64 to represent it.
 | 
			
		||||
	var globalTimeout time.Duration
 | 
			
		||||
	if o.Timeout == 0 {
 | 
			
		||||
		globalTimeout = time.Duration(math.MaxInt64)
 | 
			
		||||
	} else {
 | 
			
		||||
		globalTimeout = o.Timeout
 | 
			
		||||
	}
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case err := <-errCh:
 | 
			
		||||
			return err
 | 
			
		||||
		case <-doneCh:
 | 
			
		||||
			doneCount++
 | 
			
		||||
			if doneCount == len(pods) {
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
		case <-time.After(globalTimeout):
 | 
			
		||||
			return fmt.Errorf("Drain did not complete within %v", globalTimeout)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (o *DrainOptions) deletePods(pods []api.Pod, getPodFn func(namespace, name string) (*api.Pod, error)) error {
 | 
			
		||||
	// 0 timeout means infinite, we use MaxInt64 to represent it.
 | 
			
		||||
	var globalTimeout time.Duration
 | 
			
		||||
	if o.Timeout == 0 {
 | 
			
		||||
		globalTimeout = time.Duration(math.MaxInt64)
 | 
			
		||||
	} else {
 | 
			
		||||
		globalTimeout = o.Timeout
 | 
			
		||||
	}
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		err := o.deletePod(pod)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	getPodFn := func(namespace, name string) (*api.Pod, error) {
 | 
			
		||||
		return o.client.Core().Pods(namespace).Get(name)
 | 
			
		||||
	}
 | 
			
		||||
	_, err = o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
 | 
			
		||||
	_, err := o.waitForDelete(pods, kubectl.Interval, globalTimeout, false, getPodFn)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) {
 | 
			
		||||
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) {
 | 
			
		||||
	var verbStr string
 | 
			
		||||
	if usingEviction {
 | 
			
		||||
		verbStr = "evicted"
 | 
			
		||||
	} else {
 | 
			
		||||
		verbStr = "deleted"
 | 
			
		||||
	}
 | 
			
		||||
	err := wait.PollImmediate(interval, timeout, func() (bool, error) {
 | 
			
		||||
		pendingPods := []api.Pod{}
 | 
			
		||||
		for i, pod := range pods {
 | 
			
		||||
			p, err := getPodFn(pod.Namespace, pod.Name)
 | 
			
		||||
			if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
 | 
			
		||||
				cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted")
 | 
			
		||||
				cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, verbStr)
 | 
			
		||||
				continue
 | 
			
		||||
			} else if err != nil {
 | 
			
		||||
				return false, err
 | 
			
		||||
 
 | 
			
		||||
@@ -684,7 +684,7 @@ func TestDeletePods(t *testing.T) {
 | 
			
		||||
		o.mapper, _ = f.Object()
 | 
			
		||||
		o.out = os.Stdout
 | 
			
		||||
		_, pods := createPods(false)
 | 
			
		||||
		pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, test.getPodFn)
 | 
			
		||||
		pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn)
 | 
			
		||||
 | 
			
		||||
		if test.expectError {
 | 
			
		||||
			if err == nil {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user