mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #21891 from kubernetes/revert-21732-terminated_pods
Revert "kubelet: fix duplicated status updates at pod cleanup"
This commit is contained in:
		@@ -297,8 +297,6 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	f.Stopped = append(f.Stopped, id)
 | 
			
		||||
	// Container status should be Updated before container moved to ExitedContainerList
 | 
			
		||||
	f.updateContainerStatus(id, statusExitedPrefix)
 | 
			
		||||
	var newList []docker.APIContainers
 | 
			
		||||
	for _, container := range f.ContainerList {
 | 
			
		||||
		if container.ID == id {
 | 
			
		||||
@@ -325,6 +323,7 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
 | 
			
		||||
		container.State.Running = false
 | 
			
		||||
	}
 | 
			
		||||
	f.ContainerMap[id] = container
 | 
			
		||||
	f.updateContainerStatus(id, statusExitedPrefix)
 | 
			
		||||
	f.normalSleep(200, 50, 50)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1932,6 +1932,31 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Delete any pods that are no longer running and are marked for deletion.
 | 
			
		||||
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
 | 
			
		||||
	var terminating []*api.Pod
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		if pod.DeletionTimestamp != nil {
 | 
			
		||||
			found := false
 | 
			
		||||
			for _, runningPod := range runningPods {
 | 
			
		||||
				if runningPod.ID == pod.UID {
 | 
			
		||||
					found = true
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if found {
 | 
			
		||||
				glog.V(5).Infof("Keeping terminated pod %q, still running", format.Pod(pod))
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			terminating = append(terminating, pod)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if !kl.statusManager.TerminatePods(terminating) {
 | 
			
		||||
		return errors.New("not all pods were successfully terminated")
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// pastActiveDeadline returns true if the pod has been active for more than
 | 
			
		||||
// ActiveDeadlineSeconds.
 | 
			
		||||
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
 | 
			
		||||
@@ -2123,6 +2148,10 @@ func (kl *Kubelet) HandlePodCleanups() error {
 | 
			
		||||
	// Remove any orphaned mirror pods.
 | 
			
		||||
	kl.podManager.DeleteOrphanedMirrorPods()
 | 
			
		||||
 | 
			
		||||
	if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
 | 
			
		||||
		glog.Errorf("Failed to cleanup terminated pods: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Clear out any old bandwidth rules
 | 
			
		||||
	if err = kl.cleanupBandwidthLimits(allPods); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
@@ -2383,13 +2412,6 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
 | 
			
		||||
 | 
			
		||||
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
 | 
			
		||||
	if kl.podIsTerminated(pod) {
 | 
			
		||||
		if pod.DeletionTimestamp != nil {
 | 
			
		||||
			// If the pod is in a termianted state, there is no pod worker to
 | 
			
		||||
			// handle the work item. Check if the DeletionTimestamp has been
 | 
			
		||||
			// set, and force a status update to trigger a pod deletion request
 | 
			
		||||
			// to the apiserver.
 | 
			
		||||
			kl.statusManager.TerminatePod(pod)
 | 
			
		||||
		}
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	// Run the sync in an async worker.
 | 
			
		||||
 
 | 
			
		||||
@@ -83,9 +83,10 @@ type Manager interface {
 | 
			
		||||
	// triggers a status update.
 | 
			
		||||
	SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
 | 
			
		||||
 | 
			
		||||
	// TerminatePod resets the container status for the provided pod to terminated and triggers
 | 
			
		||||
	// a status update.
 | 
			
		||||
	TerminatePod(pod *api.Pod)
 | 
			
		||||
	// TerminatePods resets the container status for the provided pods to terminated and triggers
 | 
			
		||||
	// a status update. This function may not enqueue all the provided pods, in which case it will
 | 
			
		||||
	// return false
 | 
			
		||||
	TerminatePods(pods []*api.Pod) bool
 | 
			
		||||
 | 
			
		||||
	// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
 | 
			
		||||
	// the provided podUIDs.
 | 
			
		||||
@@ -148,7 +149,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	m.updateStatusInternal(pod, status, false)
 | 
			
		||||
	m.updateStatusInternal(pod, status)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
 | 
			
		||||
@@ -211,32 +212,31 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
 | 
			
		||||
		status.Conditions = append(status.Conditions, readyCondition)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	m.updateStatusInternal(pod, status, false)
 | 
			
		||||
	m.updateStatusInternal(pod, status)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *manager) TerminatePod(pod *api.Pod) {
 | 
			
		||||
func (m *manager) TerminatePods(pods []*api.Pod) bool {
 | 
			
		||||
	allSent := true
 | 
			
		||||
	m.podStatusesLock.Lock()
 | 
			
		||||
	defer m.podStatusesLock.Unlock()
 | 
			
		||||
	oldStatus := &pod.Status
 | 
			
		||||
	if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
 | 
			
		||||
		oldStatus = &cachedStatus.status
 | 
			
		||||
	}
 | 
			
		||||
	status, err := copyStatus(oldStatus)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	for i := range status.ContainerStatuses {
 | 
			
		||||
		status.ContainerStatuses[i].State = api.ContainerState{
 | 
			
		||||
			Terminated: &api.ContainerStateTerminated{},
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		for i := range pod.Status.ContainerStatuses {
 | 
			
		||||
			pod.Status.ContainerStatuses[i].State = api.ContainerState{
 | 
			
		||||
				Terminated: &api.ContainerStateTerminated{},
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if sent := m.updateStatusInternal(pod, pod.Status); !sent {
 | 
			
		||||
			glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod))
 | 
			
		||||
			allSent = false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	m.updateStatusInternal(pod, pod.Status, true)
 | 
			
		||||
	return allSent
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
 | 
			
		||||
// necessary. Returns whether an update was triggered.
 | 
			
		||||
// This method IS NOT THREAD SAFE and must be called from a locked function.
 | 
			
		||||
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, forceUpdate bool) bool {
 | 
			
		||||
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
 | 
			
		||||
	var oldStatus api.PodStatus
 | 
			
		||||
	cachedStatus, isCached := m.podStatuses[pod.UID]
 | 
			
		||||
	if isCached {
 | 
			
		||||
@@ -270,7 +270,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, force
 | 
			
		||||
	normalizeStatus(&status)
 | 
			
		||||
	// The intent here is to prevent concurrent updates to a pod's status from
 | 
			
		||||
	// clobbering each other so the phase of a pod progresses monotonically.
 | 
			
		||||
	if isCached && isStatusEqual(&cachedStatus.status, &status) && !forceUpdate {
 | 
			
		||||
	if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil {
 | 
			
		||||
		glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
 | 
			
		||||
		return false // No new status.
 | 
			
		||||
	}
 | 
			
		||||
@@ -289,8 +289,6 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus, force
 | 
			
		||||
	default:
 | 
			
		||||
		// Let the periodic syncBatch handle the update if the channel is full.
 | 
			
		||||
		// We can't block, since we hold the mutex lock.
 | 
			
		||||
		glog.V(4).Infof("Skpping the status update for pod %q for now because the channel is full; status: %+v",
 | 
			
		||||
			format.Pod(pod), status)
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user