mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1721 lines
		
	
	
		
			75 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1721 lines
		
	
	
		
			75 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package kubelet
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	"k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/client-go/tools/record"
 | 
						|
	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
 | 
						|
	"k8s.io/klog/v2"
 | 
						|
	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/events"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/eviction"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/metrics"
 | 
						|
	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/util/queue"
 | 
						|
	"k8s.io/utils/clock"
 | 
						|
)
 | 
						|
 | 
						|
// OnCompleteFunc is a function that is invoked when an operation completes.
 | 
						|
// If err is non-nil, the operation did not complete successfully.
 | 
						|
type OnCompleteFunc func(err error)
 | 
						|
 | 
						|
// PodStatusFunc is a function that is invoked to override the pod status when a pod is killed.
 | 
						|
type PodStatusFunc func(podStatus *v1.PodStatus)
 | 
						|
 | 
						|
// KillPodOptions are options when performing a pod update whose update type is kill.
 | 
						|
type KillPodOptions struct {
 | 
						|
	// CompletedCh is closed when the kill request completes (syncTerminatingPod has completed
 | 
						|
	// without error) or if the pod does not exist, or if the pod has already terminated. This
 | 
						|
	// could take an arbitrary amount of time to be closed, but is never left open once
 | 
						|
	// CouldHaveRunningContainers() returns false.
 | 
						|
	CompletedCh chan<- struct{}
 | 
						|
	// Evict is true if this is a pod triggered eviction - once a pod is evicted some resources are
 | 
						|
	// more aggressively reaped than during normal pod operation (stopped containers).
 | 
						|
	Evict bool
 | 
						|
	// PodStatusFunc is invoked (if set) and overrides the status of the pod at the time the pod is killed.
 | 
						|
	// The provided status is populated from the latest state.
 | 
						|
	PodStatusFunc PodStatusFunc
 | 
						|
	// PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation.
 | 
						|
	PodTerminationGracePeriodSecondsOverride *int64
 | 
						|
}
 | 
						|
 | 
						|
// UpdatePodOptions is an options struct to pass to a UpdatePod operation.
 | 
						|
type UpdatePodOptions struct {
 | 
						|
	// The type of update (create, update, sync, kill).
 | 
						|
	UpdateType kubetypes.SyncPodType
 | 
						|
	// StartTime is an optional timestamp for when this update was created. If set,
 | 
						|
	// when this update is fully realized by the pod worker it will be recorded in
 | 
						|
	// the PodWorkerDuration metric.
 | 
						|
	StartTime time.Time
 | 
						|
	// Pod to update. Required.
 | 
						|
	Pod *v1.Pod
 | 
						|
	// MirrorPod is the mirror pod if Pod is a static pod. Optional when UpdateType
 | 
						|
	// is kill or terminated.
 | 
						|
	MirrorPod *v1.Pod
 | 
						|
	// RunningPod is a runtime pod that is no longer present in config. Required
 | 
						|
	// if Pod is nil, ignored if Pod is set.
 | 
						|
	RunningPod *kubecontainer.Pod
 | 
						|
	// KillPodOptions is used to override the default termination behavior of the
 | 
						|
	// pod or to update the pod status after an operation is completed. Since a
 | 
						|
	// pod can be killed for multiple reasons, PodStatusFunc is invoked in order
 | 
						|
	// and later kills have an opportunity to override the status (i.e. a preemption
 | 
						|
	// may be later turned into an eviction).
 | 
						|
	KillPodOptions *KillPodOptions
 | 
						|
}
 | 
						|
 | 
						|
// PodWorkType classifies the status of pod as seen by the pod worker - setup (sync),
 | 
						|
// teardown of containers (terminating), or cleanup (terminated).
 | 
						|
type PodWorkerState int
 | 
						|
 | 
						|
const (
 | 
						|
	// SyncPod is when the pod is expected to be started and running.
 | 
						|
	SyncPod PodWorkerState = iota
 | 
						|
	// TerminatingPod is when the pod is no longer being set up, but some
 | 
						|
	// containers may be running and are being torn down.
 | 
						|
	TerminatingPod
 | 
						|
	// TerminatedPod indicates the pod is stopped, can have no more running
 | 
						|
	// containers, and any foreground cleanup can be executed.
 | 
						|
	TerminatedPod
 | 
						|
)
 | 
						|
 | 
						|
func (state PodWorkerState) String() string {
 | 
						|
	switch state {
 | 
						|
	case SyncPod:
 | 
						|
		return "sync"
 | 
						|
	case TerminatingPod:
 | 
						|
		return "terminating"
 | 
						|
	case TerminatedPod:
 | 
						|
		return "terminated"
 | 
						|
	default:
 | 
						|
		panic(fmt.Sprintf("the state %d is not defined", state))
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// PodWorkerSync is the summarization of a single pod worker for sync. Values
 | 
						|
// besides state are used to provide metric counts for operators.
 | 
						|
type PodWorkerSync struct {
 | 
						|
	// State of the pod.
 | 
						|
	State PodWorkerState
 | 
						|
	// Orphan is true if the pod is no longer in the desired set passed to SyncKnownPods.
 | 
						|
	Orphan bool
 | 
						|
	// HasConfig is true if we have a historical pod spec for this pod.
 | 
						|
	HasConfig bool
 | 
						|
	// Static is true if we have config and the pod came from a static source.
 | 
						|
	Static bool
 | 
						|
}
 | 
						|
 | 
						|
// podWork is the internal changes
 | 
						|
type podWork struct {
 | 
						|
	// WorkType is the type of sync to perform - sync (create), terminating (stop
 | 
						|
	// containers), terminated (clean up and write status).
 | 
						|
	WorkType PodWorkerState
 | 
						|
 | 
						|
	// Options contains the data to sync.
 | 
						|
	Options UpdatePodOptions
 | 
						|
}
 | 
						|
 | 
						|
// PodWorkers is an abstract interface for testability.
 | 
						|
type PodWorkers interface {
 | 
						|
	// UpdatePod notifies the pod worker of a change to a pod, which will then
 | 
						|
	// be processed in FIFO order by a goroutine per pod UID. The state of the
 | 
						|
	// pod will be passed to the syncPod method until either the pod is marked
 | 
						|
	// as deleted, it reaches a terminal phase (Succeeded/Failed), or the pod
 | 
						|
	// is evicted by the kubelet. Once that occurs the syncTerminatingPod method
 | 
						|
	// will be called until it exits successfully, and after that all further
 | 
						|
	// UpdatePod() calls will be ignored for that pod until it has been forgotten
 | 
						|
	// due to significant time passing. A pod that is terminated will never be
 | 
						|
	// restarted.
 | 
						|
	UpdatePod(options UpdatePodOptions)
 | 
						|
	// SyncKnownPods removes workers for pods that are not in the desiredPods set
 | 
						|
	// and have been terminated for a significant period of time. Once this method
 | 
						|
	// has been called once, the workers are assumed to be fully initialized and
 | 
						|
	// subsequent calls to ShouldPodContentBeRemoved on unknown pods will return
 | 
						|
	// true. It returns a map describing the state of each known pod worker. It
 | 
						|
	// is the responsibility of the caller to re-add any desired pods that are not
 | 
						|
	// returned as knownPods.
 | 
						|
	SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync)
 | 
						|
 | 
						|
	// IsPodKnownTerminated returns true once SyncTerminatingPod completes
 | 
						|
	// successfully - the provided pod UID it is known by the pod
 | 
						|
	// worker to be terminated. If the pod has been force deleted and the pod worker
 | 
						|
	// has completed termination this method will return false, so this method should
 | 
						|
	// only be used to filter out pods from the desired set such as in admission.
 | 
						|
	//
 | 
						|
	// Intended for use by the kubelet config loops, but not subsystems, which should
 | 
						|
	// use ShouldPod*().
 | 
						|
	IsPodKnownTerminated(uid types.UID) bool
 | 
						|
	// CouldHaveRunningContainers returns true before the pod workers have synced,
 | 
						|
	// once the pod workers see the pod (syncPod could be called), and returns false
 | 
						|
	// after the pod has been terminated (running containers guaranteed stopped).
 | 
						|
	//
 | 
						|
	// Intended for use by the kubelet config loops, but not subsystems, which should
 | 
						|
	// use ShouldPod*().
 | 
						|
	CouldHaveRunningContainers(uid types.UID) bool
 | 
						|
 | 
						|
	// ShouldPodBeFinished returns true once SyncTerminatedPod completes
 | 
						|
	// successfully - the provided pod UID it is known to the pod worker to
 | 
						|
	// be terminated and have resources reclaimed. It returns false before the
 | 
						|
	// pod workers have synced (syncPod could be called). Once the pod workers
 | 
						|
	// have synced it returns false if the pod has a sync status until
 | 
						|
	// SyncTerminatedPod completes successfully. If the pod workers have synced,
 | 
						|
	// but the pod does not have a status it returns true.
 | 
						|
	//
 | 
						|
	// Intended for use by subsystem sync loops to avoid performing background setup
 | 
						|
	// after termination has been requested for a pod. Callers must ensure that the
 | 
						|
	// syncPod method is non-blocking when their data is absent.
 | 
						|
	ShouldPodBeFinished(uid types.UID) bool
 | 
						|
	// IsPodTerminationRequested returns true when pod termination has been requested
 | 
						|
	// until the termination completes and the pod is removed from config. This should
 | 
						|
	// not be used in cleanup loops because it will return false if the pod has already
 | 
						|
	// been cleaned up - use ShouldPodContainersBeTerminating instead. Also, this method
 | 
						|
	// may return true while containers are still being initialized by the pod worker.
 | 
						|
	//
 | 
						|
	// Intended for use by the kubelet sync* methods, but not subsystems, which should
 | 
						|
	// use ShouldPod*().
 | 
						|
	IsPodTerminationRequested(uid types.UID) bool
 | 
						|
 | 
						|
	// ShouldPodContainersBeTerminating returns false before pod workers have synced,
 | 
						|
	// or once a pod has started terminating. This check is similar to
 | 
						|
	// ShouldPodRuntimeBeRemoved but is also true after pod termination is requested.
 | 
						|
	//
 | 
						|
	// Intended for use by subsystem sync loops to avoid performing background setup
 | 
						|
	// after termination has been requested for a pod. Callers must ensure that the
 | 
						|
	// syncPod method is non-blocking when their data is absent.
 | 
						|
	ShouldPodContainersBeTerminating(uid types.UID) bool
 | 
						|
	// ShouldPodRuntimeBeRemoved returns true if runtime managers within the Kubelet
 | 
						|
	// should aggressively cleanup pod resources that are not containers or on disk
 | 
						|
	// content, like attached volumes. This is true when a pod is not yet observed
 | 
						|
	// by a worker after the first sync (meaning it can't be running yet) or after
 | 
						|
	// all running containers are stopped.
 | 
						|
	// TODO: Once pod logs are separated from running containers, this method should
 | 
						|
	// be used to gate whether containers are kept.
 | 
						|
	//
 | 
						|
	// Intended for use by subsystem sync loops to know when to start tearing down
 | 
						|
	// resources that are used by running containers. Callers should ensure that
 | 
						|
	// runtime content they own is not required for post-termination - for instance
 | 
						|
	// containers are required in docker to preserve pod logs until after the pod
 | 
						|
	// is deleted.
 | 
						|
	ShouldPodRuntimeBeRemoved(uid types.UID) bool
 | 
						|
	// ShouldPodContentBeRemoved returns true if resource managers within the Kubelet
 | 
						|
	// should aggressively cleanup all content related to the pod. This is true
 | 
						|
	// during pod eviction (when we wish to remove that content to free resources)
 | 
						|
	// as well as after the request to delete a pod has resulted in containers being
 | 
						|
	// stopped (which is a more graceful action). Note that a deleting pod can still
 | 
						|
	// be evicted.
 | 
						|
	//
 | 
						|
	// Intended for use by subsystem sync loops to know when to start tearing down
 | 
						|
	// resources that are used by non-deleted pods. Content is generally preserved
 | 
						|
	// until deletion+removal_from_etcd or eviction, although garbage collection
 | 
						|
	// can free content when this method returns false.
 | 
						|
	ShouldPodContentBeRemoved(uid types.UID) bool
 | 
						|
	// IsPodForMirrorPodTerminatingByFullName returns true if a static pod with the
 | 
						|
	// provided pod name is currently terminating and has yet to complete. It is
 | 
						|
	// intended to be used only during orphan mirror pod cleanup to prevent us from
 | 
						|
	// deleting a terminating static pod from the apiserver before the pod is shut
 | 
						|
	// down.
 | 
						|
	IsPodForMirrorPodTerminatingByFullName(podFullname string) bool
 | 
						|
}
 | 
						|
 | 
						|
// podSyncer describes the core lifecyle operations of the pod state machine. A pod is first
 | 
						|
// synced until it naturally reaches termination (true is returned) or an external agent decides
 | 
						|
// the pod should be terminated. Once a pod should be terminating, SyncTerminatingPod is invoked
 | 
						|
// until it returns no error. Then the SyncTerminatedPod method is invoked until it exits without
 | 
						|
// error, and the pod is considered terminal. Implementations of this interface must be threadsafe
 | 
						|
// for simultaneous invocation of these methods for multiple pods.
 | 
						|
type podSyncer interface {
 | 
						|
	// SyncPod configures the pod and starts and restarts all containers. If it returns true, the
 | 
						|
	// pod has reached a terminal state and the presence of the error indicates succeeded or failed.
 | 
						|
	// If an error is returned, the sync was not successful and should be rerun in the future. This
 | 
						|
	// is a long running method and should exit early with context.Canceled if the context is canceled.
 | 
						|
	SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error)
 | 
						|
	// SyncTerminatingPod attempts to ensure the pod's containers are no longer running and to collect
 | 
						|
	// any final status. This method is repeatedly invoked with diminishing grace periods until it exits
 | 
						|
	// without error. Once this method exits with no error other components are allowed to tear down
 | 
						|
	// supporting resources like volumes and devices. If the context is canceled, the method should
 | 
						|
	// return context.Canceled unless it has successfully finished, which may occur when a shorter
 | 
						|
	// grace period is detected.
 | 
						|
	SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error
 | 
						|
	// SyncTerminatingRuntimePod is invoked when running containers are found that correspond to
 | 
						|
	// a pod that is no longer known to the kubelet to terminate those containers. It should not
 | 
						|
	// exit without error unless all containers are known to be stopped.
 | 
						|
	SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error
 | 
						|
	// SyncTerminatedPod is invoked after all running containers are stopped and is responsible
 | 
						|
	// for releasing resources that should be executed right away rather than in the background.
 | 
						|
	// Once it exits without error the pod is considered finished on the node.
 | 
						|
	SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error
 | 
						|
}
 | 
						|
 | 
						|
type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error)
 | 
						|
type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error
 | 
						|
type syncTerminatingRuntimePodFnType func(ctx context.Context, runningPod *kubecontainer.Pod) error
 | 
						|
type syncTerminatedPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error
 | 
						|
 | 
						|
// podSyncerFuncs implements podSyncer and accepts functions for each method.
 | 
						|
type podSyncerFuncs struct {
 | 
						|
	syncPod                   syncPodFnType
 | 
						|
	syncTerminatingPod        syncTerminatingPodFnType
 | 
						|
	syncTerminatingRuntimePod syncTerminatingRuntimePodFnType
 | 
						|
	syncTerminatedPod         syncTerminatedPodFnType
 | 
						|
}
 | 
						|
 | 
						|
func newPodSyncerFuncs(s podSyncer) podSyncerFuncs {
 | 
						|
	return podSyncerFuncs{
 | 
						|
		syncPod:                   s.SyncPod,
 | 
						|
		syncTerminatingPod:        s.SyncTerminatingPod,
 | 
						|
		syncTerminatingRuntimePod: s.SyncTerminatingRuntimePod,
 | 
						|
		syncTerminatedPod:         s.SyncTerminatedPod,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
var _ podSyncer = podSyncerFuncs{}
 | 
						|
 | 
						|
func (f podSyncerFuncs) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
 | 
						|
	return f.syncPod(ctx, updateType, pod, mirrorPod, podStatus)
 | 
						|
}
 | 
						|
func (f podSyncerFuncs) SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
 | 
						|
	return f.syncTerminatingPod(ctx, pod, podStatus, gracePeriod, podStatusFn)
 | 
						|
}
 | 
						|
func (f podSyncerFuncs) SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error {
 | 
						|
	return f.syncTerminatingRuntimePod(ctx, runningPod)
 | 
						|
}
 | 
						|
func (f podSyncerFuncs) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
 | 
						|
	return f.syncTerminatedPod(ctx, pod, podStatus)
 | 
						|
}
 | 
						|
 | 
						|
const (
 | 
						|
	// jitter factor for resyncInterval
 | 
						|
	workerResyncIntervalJitterFactor = 0.5
 | 
						|
 | 
						|
	// jitter factor for backOffPeriod and backOffOnTransientErrorPeriod
 | 
						|
	workerBackOffPeriodJitterFactor = 0.5
 | 
						|
 | 
						|
	// backoff period when transient error occurred.
 | 
						|
	backOffOnTransientErrorPeriod = time.Second
 | 
						|
)
 | 
						|
 | 
						|
// podSyncStatus tracks per-pod transitions through the three phases of pod
 | 
						|
// worker sync (setup, terminating, terminated).
 | 
						|
type podSyncStatus struct {
 | 
						|
	// ctx is the context that is associated with the current pod sync.
 | 
						|
	// TODO: remove this from the struct by having the context initialized
 | 
						|
	// in startPodSync, the cancelFn used by UpdatePod, and cancellation of
 | 
						|
	// a parent context for tearing down workers (if needed) on shutdown
 | 
						|
	ctx context.Context
 | 
						|
	// cancelFn if set is expected to cancel the current podSyncer operation.
 | 
						|
	cancelFn context.CancelFunc
 | 
						|
 | 
						|
	// fullname of the pod
 | 
						|
	fullname string
 | 
						|
 | 
						|
	// working is true if an update is pending or being worked by a pod worker
 | 
						|
	// goroutine.
 | 
						|
	working bool
 | 
						|
	// pendingUpdate is the updated state the pod worker should observe. It is
 | 
						|
	// cleared and moved to activeUpdate when a pod worker reads it. A new update
 | 
						|
	// may always replace a pending update as the pod worker does not guarantee
 | 
						|
	// that all intermediate states are synced to a worker, only the most recent.
 | 
						|
	// This state will not be visible to downstream components until a pod worker
 | 
						|
	// has begun processing it.
 | 
						|
	pendingUpdate *UpdatePodOptions
 | 
						|
	// activeUpdate is the most recent version of the pod's state that will be
 | 
						|
	// passed to a sync*Pod function. A pod becomes visible to downstream components
 | 
						|
	// once a worker decides to start a pod (startedAt is set). The pod and mirror
 | 
						|
	// pod fields are accumulated if they are missing on a particular call (the last
 | 
						|
	// known version), and the value of KillPodOptions is accumulated as pods cannot
 | 
						|
	// have their grace period shortened. This is the source of truth for the pod spec
 | 
						|
	// the kubelet is reconciling towards for all components that act on running pods.
 | 
						|
	activeUpdate *UpdatePodOptions
 | 
						|
 | 
						|
	// syncedAt is the time at which the pod worker first observed this pod.
 | 
						|
	syncedAt time.Time
 | 
						|
	// startedAt is the time at which the pod worker allowed the pod to start.
 | 
						|
	startedAt time.Time
 | 
						|
	// terminatingAt is set once the pod is requested to be killed - note that
 | 
						|
	// this can be set before the pod worker starts terminating the pod, see
 | 
						|
	// terminating.
 | 
						|
	terminatingAt time.Time
 | 
						|
	// terminatedAt is set once the pod worker has completed a successful
 | 
						|
	// syncTerminatingPod call and means all running containers are stopped.
 | 
						|
	terminatedAt time.Time
 | 
						|
	// gracePeriod is the requested gracePeriod once terminatingAt is nonzero.
 | 
						|
	gracePeriod int64
 | 
						|
	// notifyPostTerminating will be closed once the pod transitions to
 | 
						|
	// terminated. After the pod is in terminated state, nothing should be
 | 
						|
	// added to this list.
 | 
						|
	notifyPostTerminating []chan<- struct{}
 | 
						|
	// statusPostTerminating is a list of the status changes associated
 | 
						|
	// with kill pod requests. After the pod is in terminated state, nothing
 | 
						|
	// should be added to this list. The worker will execute the last function
 | 
						|
	// in this list on each termination attempt.
 | 
						|
	statusPostTerminating []PodStatusFunc
 | 
						|
 | 
						|
	// startedTerminating is true once the pod worker has observed the request to
 | 
						|
	// stop a pod (exited syncPod and observed a podWork with WorkType
 | 
						|
	// TerminatingPod). Once this is set, it is safe for other components
 | 
						|
	// of the kubelet to assume that no other containers may be started.
 | 
						|
	startedTerminating bool
 | 
						|
	// deleted is true if the pod has been marked for deletion on the apiserver
 | 
						|
	// or has no configuration represented (was deleted before).
 | 
						|
	deleted bool
 | 
						|
	// evicted is true if the kill indicated this was an eviction (an evicted
 | 
						|
	// pod can be more aggressively cleaned up).
 | 
						|
	evicted bool
 | 
						|
	// finished is true once the pod worker completes for a pod
 | 
						|
	// (syncTerminatedPod exited with no errors) until SyncKnownPods is invoked
 | 
						|
	// to remove the pod. A terminal pod (Succeeded/Failed) will have
 | 
						|
	// termination status until the pod is deleted.
 | 
						|
	finished bool
 | 
						|
	// restartRequested is true if the pod worker was informed the pod is
 | 
						|
	// expected to exist (update type of create, update, or sync) after
 | 
						|
	// it has been killed. When known pods are synced, any pod that is
 | 
						|
	// terminated and has restartRequested will have its history cleared.
 | 
						|
	restartRequested bool
 | 
						|
	// observedRuntime is true if the pod has been observed to be present in the
 | 
						|
	// runtime. A pod that has been observed at runtime must go through either
 | 
						|
	// SyncTerminatingRuntimePod or SyncTerminatingPod. Otherwise, we can avoid
 | 
						|
	// invoking the terminating methods if the pod is deleted or orphaned before
 | 
						|
	// it has been started.
 | 
						|
	observedRuntime bool
 | 
						|
}
 | 
						|
 | 
						|
func (s *podSyncStatus) IsWorking() bool              { return s.working }
 | 
						|
func (s *podSyncStatus) IsTerminationRequested() bool { return !s.terminatingAt.IsZero() }
 | 
						|
func (s *podSyncStatus) IsTerminationStarted() bool   { return s.startedTerminating }
 | 
						|
func (s *podSyncStatus) IsTerminated() bool           { return !s.terminatedAt.IsZero() }
 | 
						|
func (s *podSyncStatus) IsFinished() bool             { return s.finished }
 | 
						|
func (s *podSyncStatus) IsEvicted() bool              { return s.evicted }
 | 
						|
func (s *podSyncStatus) IsDeleted() bool              { return s.deleted }
 | 
						|
func (s *podSyncStatus) IsStarted() bool              { return !s.startedAt.IsZero() }
 | 
						|
 | 
						|
// WorkType returns this pods' current state of the pod in pod lifecycle state machine.
 | 
						|
func (s *podSyncStatus) WorkType() PodWorkerState {
 | 
						|
	if s.IsTerminated() {
 | 
						|
		return TerminatedPod
 | 
						|
	}
 | 
						|
	if s.IsTerminationRequested() {
 | 
						|
		return TerminatingPod
 | 
						|
	}
 | 
						|
	return SyncPod
 | 
						|
}
 | 
						|
 | 
						|
// mergeLastUpdate records the most recent state from a new update. Pod and MirrorPod are
 | 
						|
// incremented. KillPodOptions is accumulated. If RunningPod is set, Pod is synthetic and
 | 
						|
// will *not* be used as the last pod state unless no previous pod state exists (because
 | 
						|
// the pod worker may be responsible for terminating a pod from a previous run of the
 | 
						|
// kubelet where no config state is visible). The contents of activeUpdate are used as the
 | 
						|
// source of truth for components downstream of the pod workers.
 | 
						|
func (s *podSyncStatus) mergeLastUpdate(other UpdatePodOptions) {
 | 
						|
	opts := s.activeUpdate
 | 
						|
	if opts == nil {
 | 
						|
		opts = &UpdatePodOptions{}
 | 
						|
		s.activeUpdate = opts
 | 
						|
	}
 | 
						|
 | 
						|
	// UpdatePodOptions states (and UpdatePod enforces) that either Pod or RunningPod
 | 
						|
	// is set, and we wish to preserve the most recent Pod we have observed, so only
 | 
						|
	// overwrite our Pod when we have no Pod or when RunningPod is nil.
 | 
						|
	if opts.Pod == nil || other.RunningPod == nil {
 | 
						|
		opts.Pod = other.Pod
 | 
						|
	}
 | 
						|
	// running pods will not persist but will be remembered for replay
 | 
						|
	opts.RunningPod = other.RunningPod
 | 
						|
	// if mirrorPod was not provided, remember the last one for replay
 | 
						|
	if other.MirrorPod != nil {
 | 
						|
		opts.MirrorPod = other.MirrorPod
 | 
						|
	}
 | 
						|
	// accumulate kill pod options
 | 
						|
	if other.KillPodOptions != nil {
 | 
						|
		opts.KillPodOptions = &KillPodOptions{}
 | 
						|
		if other.KillPodOptions.Evict {
 | 
						|
			opts.KillPodOptions.Evict = true
 | 
						|
		}
 | 
						|
		if override := other.KillPodOptions.PodTerminationGracePeriodSecondsOverride; override != nil {
 | 
						|
			value := *override
 | 
						|
			opts.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &value
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// StartTime is not copied - that is purely for tracking latency of config propagation
 | 
						|
	// from kubelet to pod worker.
 | 
						|
}
 | 
						|
 | 
						|
// podWorkers keeps track of operations on pods and ensures each pod is
 | 
						|
// reconciled with the container runtime and other subsystems. The worker
 | 
						|
// also tracks which pods are in flight for starting, which pods are
 | 
						|
// shutting down but still have running containers, and which pods have
 | 
						|
// terminated recently and are guaranteed to have no running containers.
 | 
						|
//
 | 
						|
// podWorkers is the source of truth for what pods should be active on a
 | 
						|
// node at any time, and is kept up to date with the desired state of the
 | 
						|
// node (tracked by the kubelet pod config loops and the state in the
 | 
						|
// kubelet's podManager) via the UpdatePod method. Components that act
 | 
						|
// upon running pods should look to the pod worker for state instead of the
 | 
						|
// kubelet podManager. The pod worker is periodically reconciled with the
 | 
						|
// state of the podManager via SyncKnownPods() and is responsible for
 | 
						|
// ensuring the completion of all observed pods no longer present in
 | 
						|
// the podManager (no longer part of the node's desired config).
 | 
						|
//
 | 
						|
// A pod passed to a pod worker is either being synced (expected to be
 | 
						|
// running), terminating (has running containers but no new containers are
 | 
						|
// expected to start), terminated (has no running containers but may still
 | 
						|
// have resources being consumed), or cleaned up (no resources remaining).
 | 
						|
// Once a pod is set to be "torn down" it cannot be started again for that
 | 
						|
// UID (corresponding to a delete or eviction) until:
 | 
						|
//
 | 
						|
//  1. The pod worker is finalized (syncTerminatingPod and
 | 
						|
//     syncTerminatedPod exit without error sequentially)
 | 
						|
//  2. The SyncKnownPods method is invoked by kubelet housekeeping and the pod
 | 
						|
//     is not part of the known config.
 | 
						|
//
 | 
						|
// Pod workers provide a consistent source of information to other kubelet
 | 
						|
// loops about the status of the pod and whether containers can be
 | 
						|
// running. The ShouldPodContentBeRemoved() method tracks whether a pod's
 | 
						|
// contents should still exist, which includes non-existent pods after
 | 
						|
// SyncKnownPods() has been called once (as per the contract, all existing
 | 
						|
// pods should be provided via UpdatePod before SyncKnownPods is invoked).
 | 
						|
// Generally other sync loops are expected to separate "setup" and
 | 
						|
// "teardown" responsibilities and the information methods here assist in
 | 
						|
// each by centralizing that state. A simple visualization of the time
 | 
						|
// intervals involved might look like:
 | 
						|
//
 | 
						|
// ---|                                         = kubelet config has synced at least once
 | 
						|
// -------|                                  |- = pod exists in apiserver config
 | 
						|
// --------|                  |---------------- = CouldHaveRunningContainers() is true
 | 
						|
//
 | 
						|
//	^- pod is observed by pod worker  .
 | 
						|
//	.                                 .
 | 
						|
//
 | 
						|
// ----------|       |------------------------- = syncPod is running
 | 
						|
//
 | 
						|
//	. ^- pod worker loop sees change and invokes syncPod
 | 
						|
//	. .                               .
 | 
						|
//
 | 
						|
// --------------|                     |------- = ShouldPodContainersBeTerminating() returns true
 | 
						|
// --------------|                     |------- = IsPodTerminationRequested() returns true (pod is known)
 | 
						|
//
 | 
						|
//	. .   ^- Kubelet evicts pod       .
 | 
						|
//	. .                               .
 | 
						|
//
 | 
						|
// -------------------|       |---------------- = syncTerminatingPod runs then exits without error
 | 
						|
//
 | 
						|
//	        . .        ^ pod worker loop exits syncPod, sees pod is terminating,
 | 
						|
//					 . .          invokes syncTerminatingPod
 | 
						|
//	        . .                               .
 | 
						|
//
 | 
						|
// ---|    |------------------|              .  = ShouldPodRuntimeBeRemoved() returns true (post-sync)
 | 
						|
//
 | 
						|
//	.                ^ syncTerminatingPod has exited successfully
 | 
						|
//	.                               .
 | 
						|
//
 | 
						|
// ----------------------------|       |------- = syncTerminatedPod runs then exits without error
 | 
						|
//
 | 
						|
//	.                         ^ other loops can tear down
 | 
						|
//	.                               .
 | 
						|
//
 | 
						|
// ------------------------------------|  |---- = status manager is waiting for SyncTerminatedPod() finished
 | 
						|
//
 | 
						|
//	.                         ^     .
 | 
						|
//
 | 
						|
// ----------|                               |- = status manager can be writing pod status
 | 
						|
//
 | 
						|
//	^ status manager deletes pod because no longer exists in config
 | 
						|
//
 | 
						|
// Other components in the Kubelet can request a termination of the pod
 | 
						|
// via the UpdatePod method or the killPodNow wrapper - this will ensure
 | 
						|
// the components of the pod are stopped until the kubelet is restarted
 | 
						|
// or permanently (if the phase of the pod is set to a terminal phase
 | 
						|
// in the pod status change).
 | 
						|
type podWorkers struct {
 | 
						|
	// Protects all per worker fields.
 | 
						|
	podLock sync.Mutex
 | 
						|
	// podsSynced is true once the pod worker has been synced at least once,
 | 
						|
	// which means that all working pods have been started via UpdatePod().
 | 
						|
	podsSynced bool
 | 
						|
 | 
						|
	// Tracks all running per-pod goroutines - per-pod goroutine will be
 | 
						|
	// processing updates received through its corresponding channel. Sending
 | 
						|
	// a message on this channel will signal the corresponding goroutine to
 | 
						|
	// consume podSyncStatuses[uid].pendingUpdate if set.
 | 
						|
	podUpdates map[types.UID]chan struct{}
 | 
						|
	// Tracks by UID the termination status of a pod - syncing, terminating,
 | 
						|
	// terminated, and evicted.
 | 
						|
	podSyncStatuses map[types.UID]*podSyncStatus
 | 
						|
 | 
						|
	// Tracks all uids for started static pods by full name
 | 
						|
	startedStaticPodsByFullname map[string]types.UID
 | 
						|
	// Tracks all uids for static pods that are waiting to start by full name
 | 
						|
	waitingToStartStaticPodsByFullname map[string][]types.UID
 | 
						|
 | 
						|
	workQueue queue.WorkQueue
 | 
						|
 | 
						|
	// This function is run to sync the desired state of pod.
 | 
						|
	// NOTE: This function has to be thread-safe - it can be called for
 | 
						|
	// different pods at the same time.
 | 
						|
	podSyncer podSyncer
 | 
						|
 | 
						|
	// workerChannelFn is exposed for testing to allow unit tests to impose delays
 | 
						|
	// in channel communication. The function is invoked once each time a new worker
 | 
						|
	// goroutine starts.
 | 
						|
	workerChannelFn func(uid types.UID, in chan struct{}) (out <-chan struct{})
 | 
						|
 | 
						|
	// The EventRecorder to use
 | 
						|
	recorder record.EventRecorder
 | 
						|
 | 
						|
	// backOffPeriod is the duration to back off when there is a sync error.
 | 
						|
	backOffPeriod time.Duration
 | 
						|
 | 
						|
	// resyncInterval is the duration to wait until the next sync.
 | 
						|
	resyncInterval time.Duration
 | 
						|
 | 
						|
	// podCache stores kubecontainer.PodStatus for all pods.
 | 
						|
	podCache kubecontainer.Cache
 | 
						|
 | 
						|
	// clock is used for testing timing
 | 
						|
	clock clock.PassiveClock
 | 
						|
}
 | 
						|
 | 
						|
func newPodWorkers(
 | 
						|
	podSyncer podSyncer,
 | 
						|
	recorder record.EventRecorder,
 | 
						|
	workQueue queue.WorkQueue,
 | 
						|
	resyncInterval, backOffPeriod time.Duration,
 | 
						|
	podCache kubecontainer.Cache,
 | 
						|
) PodWorkers {
 | 
						|
	return &podWorkers{
 | 
						|
		podSyncStatuses:                    map[types.UID]*podSyncStatus{},
 | 
						|
		podUpdates:                         map[types.UID]chan struct{}{},
 | 
						|
		startedStaticPodsByFullname:        map[string]types.UID{},
 | 
						|
		waitingToStartStaticPodsByFullname: map[string][]types.UID{},
 | 
						|
		podSyncer:                          podSyncer,
 | 
						|
		recorder:                           recorder,
 | 
						|
		workQueue:                          workQueue,
 | 
						|
		resyncInterval:                     resyncInterval,
 | 
						|
		backOffPeriod:                      backOffPeriod,
 | 
						|
		podCache:                           podCache,
 | 
						|
		clock:                              clock.RealClock{},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *podWorkers) IsPodKnownTerminated(uid types.UID) bool {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	if status, ok := p.podSyncStatuses[uid]; ok {
 | 
						|
		return status.IsTerminated()
 | 
						|
	}
 | 
						|
	// if the pod is not known, we return false (pod worker is not aware of it)
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
func (p *podWorkers) CouldHaveRunningContainers(uid types.UID) bool {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	if status, ok := p.podSyncStatuses[uid]; ok {
 | 
						|
		return !status.IsTerminated()
 | 
						|
	}
 | 
						|
	// once all pods are synced, any pod without sync status is known to not be running.
 | 
						|
	return !p.podsSynced
 | 
						|
}
 | 
						|
 | 
						|
func (p *podWorkers) ShouldPodBeFinished(uid types.UID) bool {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	if status, ok := p.podSyncStatuses[uid]; ok {
 | 
						|
		return status.IsFinished()
 | 
						|
	}
 | 
						|
	// once all pods are synced, any pod without sync status is assumed to
 | 
						|
	// have SyncTerminatedPod finished.
 | 
						|
	return p.podsSynced
 | 
						|
}
 | 
						|
 | 
						|
func (p *podWorkers) IsPodTerminationRequested(uid types.UID) bool {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	if status, ok := p.podSyncStatuses[uid]; ok {
 | 
						|
		// the pod may still be setting up at this point.
 | 
						|
		return status.IsTerminationRequested()
 | 
						|
	}
 | 
						|
	// an unknown pod is considered not to be terminating (use ShouldPodContainersBeTerminating in
 | 
						|
	// cleanup loops to avoid failing to cleanup pods that have already been removed from config)
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
func (p *podWorkers) ShouldPodContainersBeTerminating(uid types.UID) bool {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	if status, ok := p.podSyncStatuses[uid]; ok {
 | 
						|
		// we wait until the pod worker goroutine observes the termination, which means syncPod will not
 | 
						|
		// be executed again, which means no new containers can be started
 | 
						|
		return status.IsTerminationStarted()
 | 
						|
	}
 | 
						|
	// once we've synced, if the pod isn't known to the workers we should be tearing them
 | 
						|
	// down
 | 
						|
	return p.podsSynced
 | 
						|
}
 | 
						|
 | 
						|
func (p *podWorkers) ShouldPodRuntimeBeRemoved(uid types.UID) bool {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	if status, ok := p.podSyncStatuses[uid]; ok {
 | 
						|
		return status.IsTerminated()
 | 
						|
	}
 | 
						|
	// a pod that hasn't been sent to the pod worker yet should have no runtime components once we have
 | 
						|
	// synced all content.
 | 
						|
	return p.podsSynced
 | 
						|
}
 | 
						|
 | 
						|
func (p *podWorkers) ShouldPodContentBeRemoved(uid types.UID) bool {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	if status, ok := p.podSyncStatuses[uid]; ok {
 | 
						|
		return status.IsEvicted() || (status.IsDeleted() && status.IsTerminated())
 | 
						|
	}
 | 
						|
	// a pod that hasn't been sent to the pod worker yet should have no content on disk once we have
 | 
						|
	// synced all content.
 | 
						|
	return p.podsSynced
 | 
						|
}
 | 
						|
 | 
						|
func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	uid, started := p.startedStaticPodsByFullname[podFullName]
 | 
						|
	if !started {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	status, exists := p.podSyncStatuses[uid]
 | 
						|
	if !exists {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if !status.IsTerminationRequested() || status.IsTerminated() {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool {
 | 
						|
	for _, container := range status.ContainerStatuses {
 | 
						|
		if container.State == kubecontainer.ContainerStateRunning {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
	}
 | 
						|
	for _, sb := range status.SandboxStatuses {
 | 
						|
		if sb.State == runtimeapi.PodSandboxState_SANDBOX_READY {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
 | 
						|
// terminating, or terminated, and will transition to terminating if: deleted on the apiserver,
 | 
						|
// discovered to have a terminal phase (Succeeded or Failed), or evicted by the kubelet.
 | 
						|
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
 | 
						|
	// Handle when the pod is an orphan (no config) and we only have runtime status by running only
 | 
						|
	// the terminating part of the lifecycle. A running pod contains only a minimal set of information
 | 
						|
	// about the pod
 | 
						|
	var isRuntimePod bool
 | 
						|
	var uid types.UID
 | 
						|
	var name, ns string
 | 
						|
	if runningPod := options.RunningPod; runningPod != nil {
 | 
						|
		if options.Pod == nil {
 | 
						|
			// the sythetic pod created here is used only as a placeholder and not tracked
 | 
						|
			if options.UpdateType != kubetypes.SyncPodKill {
 | 
						|
				klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType)
 | 
						|
				return
 | 
						|
			}
 | 
						|
			uid, ns, name = runningPod.ID, runningPod.Namespace, runningPod.Name
 | 
						|
			isRuntimePod = true
 | 
						|
		} else {
 | 
						|
			options.RunningPod = nil
 | 
						|
			uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name
 | 
						|
			klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name
 | 
						|
	}
 | 
						|
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
 | 
						|
	// decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it
 | 
						|
	var firstTime bool
 | 
						|
	now := p.clock.Now()
 | 
						|
	status, ok := p.podSyncStatuses[uid]
 | 
						|
	if !ok {
 | 
						|
		klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
		firstTime = true
 | 
						|
		status = &podSyncStatus{
 | 
						|
			syncedAt: now,
 | 
						|
			fullname: kubecontainer.BuildPodFullName(name, ns),
 | 
						|
		}
 | 
						|
		// if this pod is being synced for the first time, we need to make sure it is an active pod
 | 
						|
		if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) {
 | 
						|
			// Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated.
 | 
						|
			// This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523
 | 
						|
			// However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set.
 | 
						|
			if statusCache, err := p.podCache.Get(uid); err == nil {
 | 
						|
				if isPodStatusCacheTerminal(statusCache) {
 | 
						|
					// At this point we know:
 | 
						|
					// (1) The pod is terminal based on the config source.
 | 
						|
					// (2) The pod is terminal based on the runtime cache.
 | 
						|
					// This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart.
 | 
						|
					// These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod.
 | 
						|
					// As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run.
 | 
						|
					status = &podSyncStatus{
 | 
						|
						terminatedAt:       now,
 | 
						|
						terminatingAt:      now,
 | 
						|
						syncedAt:           now,
 | 
						|
						startedTerminating: true,
 | 
						|
						finished:           false,
 | 
						|
						fullname:           kubecontainer.BuildPodFullName(name, ns),
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
		p.podSyncStatuses[uid] = status
 | 
						|
	}
 | 
						|
 | 
						|
	// RunningPods represent an unknown pod execution and don't contain pod spec information
 | 
						|
	// sufficient to perform any action other than termination. If we received a RunningPod
 | 
						|
	// after a real pod has already been provided, use the most recent spec instead. Also,
 | 
						|
	// once we observe a runtime pod we must drive it to completion, even if we weren't the
 | 
						|
	// ones who started it.
 | 
						|
	pod := options.Pod
 | 
						|
	if isRuntimePod {
 | 
						|
		status.observedRuntime = true
 | 
						|
		switch {
 | 
						|
		case status.pendingUpdate != nil && status.pendingUpdate.Pod != nil:
 | 
						|
			pod = status.pendingUpdate.Pod
 | 
						|
			options.Pod = pod
 | 
						|
			options.RunningPod = nil
 | 
						|
		case status.activeUpdate != nil && status.activeUpdate.Pod != nil:
 | 
						|
			pod = status.activeUpdate.Pod
 | 
						|
			options.Pod = pod
 | 
						|
			options.RunningPod = nil
 | 
						|
		default:
 | 
						|
			// we will continue to use RunningPod.ToAPIPod() as pod here, but
 | 
						|
			// options.Pod will be nil and other methods must handle that appropriately.
 | 
						|
			pod = options.RunningPod.ToAPIPod()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// When we see a create update on an already terminating pod, that implies two pods with the same UID were created in
 | 
						|
	// close temporal proximity (usually static pod but it's possible for an apiserver to extremely rarely do something
 | 
						|
	// similar) - flag the sync status to indicate that after the pod terminates it should be reset to "not running" to
 | 
						|
	// allow a subsequent add/update to start the pod worker again. This does not apply to the first time we see a pod,
 | 
						|
	// such as when the kubelet restarts and we see already terminated pods for the first time.
 | 
						|
	if !firstTime && status.IsTerminationRequested() {
 | 
						|
		if options.UpdateType == kubetypes.SyncPodCreate {
 | 
						|
			status.restartRequested = true
 | 
						|
			klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
 | 
						|
	if status.IsFinished() {
 | 
						|
		klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// check for a transition to terminating
 | 
						|
	var becameTerminating bool
 | 
						|
	if !status.IsTerminationRequested() {
 | 
						|
		switch {
 | 
						|
		case isRuntimePod:
 | 
						|
			klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
			status.deleted = true
 | 
						|
			status.terminatingAt = now
 | 
						|
			becameTerminating = true
 | 
						|
		case pod.DeletionTimestamp != nil:
 | 
						|
			klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
			status.deleted = true
 | 
						|
			status.terminatingAt = now
 | 
						|
			becameTerminating = true
 | 
						|
		case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
 | 
						|
			klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
			status.terminatingAt = now
 | 
						|
			becameTerminating = true
 | 
						|
		case options.UpdateType == kubetypes.SyncPodKill:
 | 
						|
			if options.KillPodOptions != nil && options.KillPodOptions.Evict {
 | 
						|
				klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
				status.evicted = true
 | 
						|
			} else {
 | 
						|
				klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
			}
 | 
						|
			status.terminatingAt = now
 | 
						|
			becameTerminating = true
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// once a pod is terminating, all updates are kills and the grace period can only decrease
 | 
						|
	var wasGracePeriodShortened bool
 | 
						|
	switch {
 | 
						|
	case status.IsTerminated():
 | 
						|
		// A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request
 | 
						|
		// due to housekeeping seeing an older cached version of the runtime pod simply ignore it until
 | 
						|
		// after the pod worker completes.
 | 
						|
		if isRuntimePod {
 | 
						|
			klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		if options.KillPodOptions != nil {
 | 
						|
			if ch := options.KillPodOptions.CompletedCh; ch != nil {
 | 
						|
				close(ch)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		options.KillPodOptions = nil
 | 
						|
 | 
						|
	case status.IsTerminationRequested():
 | 
						|
		if options.KillPodOptions == nil {
 | 
						|
			options.KillPodOptions = &KillPodOptions{}
 | 
						|
		}
 | 
						|
 | 
						|
		if ch := options.KillPodOptions.CompletedCh; ch != nil {
 | 
						|
			status.notifyPostTerminating = append(status.notifyPostTerminating, ch)
 | 
						|
		}
 | 
						|
		if fn := options.KillPodOptions.PodStatusFunc; fn != nil {
 | 
						|
			status.statusPostTerminating = append(status.statusPostTerminating, fn)
 | 
						|
		}
 | 
						|
 | 
						|
		gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions)
 | 
						|
 | 
						|
		wasGracePeriodShortened = gracePeriodShortened
 | 
						|
		status.gracePeriod = gracePeriod
 | 
						|
		// always set the grace period for syncTerminatingPod so we don't have to recalculate,
 | 
						|
		// will never be zero.
 | 
						|
		options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
 | 
						|
 | 
						|
	default:
 | 
						|
		// KillPodOptions is not valid for sync actions outside of the terminating phase
 | 
						|
		if options.KillPodOptions != nil {
 | 
						|
			if ch := options.KillPodOptions.CompletedCh; ch != nil {
 | 
						|
				close(ch)
 | 
						|
			}
 | 
						|
			options.KillPodOptions = nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// start the pod worker goroutine if it doesn't exist
 | 
						|
	podUpdates, exists := p.podUpdates[uid]
 | 
						|
	if !exists {
 | 
						|
		// buffer the channel to avoid blocking this method
 | 
						|
		podUpdates = make(chan struct{}, 1)
 | 
						|
		p.podUpdates[uid] = podUpdates
 | 
						|
 | 
						|
		// ensure that static pods start in the order they are received by UpdatePod
 | 
						|
		if kubetypes.IsStaticPod(pod) {
 | 
						|
			p.waitingToStartStaticPodsByFullname[status.fullname] =
 | 
						|
				append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
 | 
						|
		}
 | 
						|
 | 
						|
		// allow testing of delays in the pod update channel
 | 
						|
		var outCh <-chan struct{}
 | 
						|
		if p.workerChannelFn != nil {
 | 
						|
			outCh = p.workerChannelFn(uid, podUpdates)
 | 
						|
		} else {
 | 
						|
			outCh = podUpdates
 | 
						|
		}
 | 
						|
 | 
						|
		// spawn a pod worker
 | 
						|
		go func() {
 | 
						|
			// TODO: this should be a wait.Until with backoff to handle panics, and
 | 
						|
			// accept a context for shutdown
 | 
						|
			defer runtime.HandleCrash()
 | 
						|
			defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
 | 
						|
			p.podWorkerLoop(uid, outCh)
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	// measure the maximum latency between a call to UpdatePod and when the pod worker reacts to it
 | 
						|
	// by preserving the oldest StartTime
 | 
						|
	if status.pendingUpdate != nil && !status.pendingUpdate.StartTime.IsZero() && status.pendingUpdate.StartTime.Before(options.StartTime) {
 | 
						|
		options.StartTime = status.pendingUpdate.StartTime
 | 
						|
	}
 | 
						|
 | 
						|
	// notify the pod worker there is a pending update
 | 
						|
	status.pendingUpdate = &options
 | 
						|
	status.working = true
 | 
						|
	klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
 | 
						|
	select {
 | 
						|
	case podUpdates <- struct{}{}:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
	if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil {
 | 
						|
		klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
 | 
						|
		status.cancelFn()
 | 
						|
		return
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// calculateEffectiveGracePeriod sets the initial grace period for a newly terminating pod or allows a
 | 
						|
// shorter grace period to be provided, returning the desired value.
 | 
						|
func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *KillPodOptions) (int64, bool) {
 | 
						|
	// enforce the restriction that a grace period can only decrease and track whatever our value is,
 | 
						|
	// then ensure a calculated value is passed down to lower levels
 | 
						|
	gracePeriod := status.gracePeriod
 | 
						|
	// this value is bedrock truth - the apiserver owns telling us this value calculated by apiserver
 | 
						|
	if override := pod.DeletionGracePeriodSeconds; override != nil {
 | 
						|
		if gracePeriod == 0 || *override < gracePeriod {
 | 
						|
			gracePeriod = *override
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// we allow other parts of the kubelet (namely eviction) to request this pod be terminated faster
 | 
						|
	if options != nil {
 | 
						|
		if override := options.PodTerminationGracePeriodSecondsOverride; override != nil {
 | 
						|
			if gracePeriod == 0 || *override < gracePeriod {
 | 
						|
				gracePeriod = *override
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// make a best effort to default this value to the pod's desired intent, in the event
 | 
						|
	// the kubelet provided no requested value (graceful termination?)
 | 
						|
	if gracePeriod == 0 && pod.Spec.TerminationGracePeriodSeconds != nil {
 | 
						|
		gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
 | 
						|
	}
 | 
						|
	// no matter what, we always supply a grace period of 1
 | 
						|
	if gracePeriod < 1 {
 | 
						|
		gracePeriod = 1
 | 
						|
	}
 | 
						|
	return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod
 | 
						|
}
 | 
						|
 | 
						|
// allowPodStart tries to start the pod and returns true if allowed, otherwise
 | 
						|
// it requeues the pod and returns false. If the pod will never be able to start
 | 
						|
// because data is missing, or the pod was terminated before start, canEverStart
 | 
						|
// is false. This method can only be called while holding the pod lock.
 | 
						|
func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) {
 | 
						|
	if !kubetypes.IsStaticPod(pod) {
 | 
						|
		// TODO: Do we want to allow non-static pods with the same full name?
 | 
						|
		// Note that it may disable the force deletion of pods.
 | 
						|
		return true, true
 | 
						|
	}
 | 
						|
	status, ok := p.podSyncStatuses[pod.UID]
 | 
						|
	if !ok {
 | 
						|
		klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID)
 | 
						|
		return false, false
 | 
						|
	}
 | 
						|
	if status.IsTerminationRequested() {
 | 
						|
		return false, false
 | 
						|
	}
 | 
						|
	if !p.allowStaticPodStart(status.fullname, pod.UID) {
 | 
						|
		p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
 | 
						|
		return false, true
 | 
						|
	}
 | 
						|
	return true, true
 | 
						|
}
 | 
						|
 | 
						|
// allowStaticPodStart tries to start the static pod and returns true if
 | 
						|
// 1. there are no other started static pods with the same fullname
 | 
						|
// 2. the uid matches that of the first valid static pod waiting to start
 | 
						|
func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool {
 | 
						|
	startedUID, started := p.startedStaticPodsByFullname[fullname]
 | 
						|
	if started {
 | 
						|
		return startedUID == uid
 | 
						|
	}
 | 
						|
 | 
						|
	waitingPods := p.waitingToStartStaticPodsByFullname[fullname]
 | 
						|
	// TODO: This is O(N) with respect to the number of updates to static pods
 | 
						|
	// with overlapping full names, and ideally would be O(1).
 | 
						|
	for i, waitingUID := range waitingPods {
 | 
						|
		// has pod already terminated or been deleted?
 | 
						|
		status, ok := p.podSyncStatuses[waitingUID]
 | 
						|
		if !ok || status.IsTerminationRequested() || status.IsTerminated() {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// another pod is next in line
 | 
						|
		if waitingUID != uid {
 | 
						|
			p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:]
 | 
						|
			return false
 | 
						|
		}
 | 
						|
		// we are up next, remove ourselves
 | 
						|
		waitingPods = waitingPods[i+1:]
 | 
						|
		break
 | 
						|
	}
 | 
						|
	if len(waitingPods) != 0 {
 | 
						|
		p.waitingToStartStaticPodsByFullname[fullname] = waitingPods
 | 
						|
	} else {
 | 
						|
		delete(p.waitingToStartStaticPodsByFullname, fullname)
 | 
						|
	}
 | 
						|
	p.startedStaticPodsByFullname[fullname] = uid
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// cleanupUnstartedPod is invoked if a pod that has never been started receives a termination
 | 
						|
// signal before it can be started. This method must be called holding the pod lock.
 | 
						|
func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) {
 | 
						|
	p.cleanupPodUpdates(pod.UID)
 | 
						|
 | 
						|
	if status.terminatingAt.IsZero() {
 | 
						|
		klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
 | 
						|
	}
 | 
						|
	if !status.terminatedAt.IsZero() {
 | 
						|
		klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
 | 
						|
	}
 | 
						|
	status.finished = true
 | 
						|
	status.working = false
 | 
						|
	status.terminatedAt = p.clock.Now()
 | 
						|
 | 
						|
	if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
 | 
						|
		delete(p.startedStaticPodsByFullname, status.fullname)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// startPodSync is invoked by each pod worker goroutine when a message arrives on the pod update channel.
 | 
						|
// This method consumes a pending update, initializes a context, decides whether the pod is already started
 | 
						|
// or can be started, and updates the cached pod state so that downstream components can observe what the
 | 
						|
// pod worker goroutine is currently attempting to do. If ok is false, there is no available event. If any
 | 
						|
// of the boolean values is false, ensure the appropriate cleanup happens before returning.
 | 
						|
//
 | 
						|
// This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate,
 | 
						|
// or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started
 | 
						|
// should never have an activeUpdate because that is exposed to downstream components on started pods.
 | 
						|
func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
 | 
						|
	// verify we are known to the pod worker still
 | 
						|
	status, ok := p.podSyncStatuses[podUID]
 | 
						|
	if !ok {
 | 
						|
		// pod status has disappeared, the worker should exit
 | 
						|
		klog.V(4).InfoS("Pod worker no longer has status, worker should exit", "podUID", podUID)
 | 
						|
		return nil, update, false, false, false
 | 
						|
	}
 | 
						|
	if !status.working {
 | 
						|
		// working is used by unit tests to observe whether a worker is currently acting on this pod
 | 
						|
		klog.V(4).InfoS("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID)
 | 
						|
	}
 | 
						|
	if status.pendingUpdate == nil {
 | 
						|
		// no update available, this means we were queued without work being added or there is a
 | 
						|
		// race condition, both of which are unexpected
 | 
						|
		status.working = false
 | 
						|
		klog.V(4).InfoS("Pod worker received no pending work, programmer error?", "podUID", podUID)
 | 
						|
		return nil, update, false, false, false
 | 
						|
	}
 | 
						|
 | 
						|
	// consume the pending update
 | 
						|
	update.WorkType = status.WorkType()
 | 
						|
	update.Options = *status.pendingUpdate
 | 
						|
	status.pendingUpdate = nil
 | 
						|
	select {
 | 
						|
	case <-p.podUpdates[podUID]:
 | 
						|
		// ensure the pod update channel is empty (it is only ever written to under lock)
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
	// initialize a context for the worker if one does not exist
 | 
						|
	if status.ctx == nil || status.ctx.Err() == context.Canceled {
 | 
						|
		status.ctx, status.cancelFn = context.WithCancel(context.Background())
 | 
						|
	}
 | 
						|
	ctx = status.ctx
 | 
						|
 | 
						|
	// if we are already started, make our state visible to downstream components
 | 
						|
	if status.IsStarted() {
 | 
						|
		status.mergeLastUpdate(update.Options)
 | 
						|
		return ctx, update, true, true, true
 | 
						|
	}
 | 
						|
 | 
						|
	// if we are already terminating and we only have a running pod, allow the worker
 | 
						|
	// to "start" since we are immediately moving to terminating
 | 
						|
	if update.Options.RunningPod != nil && update.WorkType == TerminatingPod {
 | 
						|
		status.mergeLastUpdate(update.Options)
 | 
						|
		return ctx, update, true, true, true
 | 
						|
	}
 | 
						|
 | 
						|
	// If we receive an update where Pod is nil (running pod is set) but haven't
 | 
						|
	// started yet, we can only terminate the pod, not start it. We should not be
 | 
						|
	// asked to start such a pod, but guard here just in case an accident occurs.
 | 
						|
	if update.Options.Pod == nil {
 | 
						|
		status.mergeLastUpdate(update.Options)
 | 
						|
		klog.V(4).InfoS("Running pod cannot start ever, programmer error", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
 | 
						|
		return ctx, update, false, false, true
 | 
						|
	}
 | 
						|
 | 
						|
	// verify we can start
 | 
						|
	canStart, canEverStart = p.allowPodStart(update.Options.Pod)
 | 
						|
	switch {
 | 
						|
	case !canEverStart:
 | 
						|
		p.cleanupUnstartedPod(update.Options.Pod, status)
 | 
						|
		status.working = false
 | 
						|
		if start := update.Options.StartTime; !start.IsZero() {
 | 
						|
			metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
 | 
						|
		}
 | 
						|
		klog.V(4).InfoS("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
 | 
						|
		return ctx, update, canStart, canEverStart, true
 | 
						|
	case !canStart:
 | 
						|
		// this is the only path we don't start the pod, so we need to put the change back in pendingUpdate
 | 
						|
		status.pendingUpdate = &update.Options
 | 
						|
		status.working = false
 | 
						|
		klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID)
 | 
						|
		return ctx, update, canStart, canEverStart, true
 | 
						|
	}
 | 
						|
 | 
						|
	// mark the pod as started
 | 
						|
	status.startedAt = p.clock.Now()
 | 
						|
	status.mergeLastUpdate(update.Options)
 | 
						|
 | 
						|
	// If we are admitting the pod and it is new, record the count of containers
 | 
						|
	// TODO: We should probably move this into syncPod and add an execution count
 | 
						|
	// to the syncPod arguments, and this should be recorded on the first sync.
 | 
						|
	// Leaving it here complicates a particularly important loop.
 | 
						|
	metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers)))
 | 
						|
 | 
						|
	return ctx, update, true, true, true
 | 
						|
}
 | 
						|
 | 
						|
func podUIDAndRefForUpdate(update UpdatePodOptions) (types.UID, klog.ObjectRef) {
 | 
						|
	if update.RunningPod != nil {
 | 
						|
		return update.RunningPod.ID, klog.KObj(update.RunningPod.ToAPIPod())
 | 
						|
	}
 | 
						|
	return update.Pod.UID, klog.KObj(update.Pod)
 | 
						|
}
 | 
						|
 | 
						|
// podWorkerLoop manages sequential state updates to a pod in a goroutine, exiting once the final
 | 
						|
// state is reached. The loop is responsible for driving the pod through four main phases:
 | 
						|
//
 | 
						|
// 1. Wait to start, guaranteeing no two pods with the same UID or same fullname are running at the same time
 | 
						|
// 2. Sync, orchestrating pod setup by reconciling the desired pod spec with the runtime state of the pod
 | 
						|
// 3. Terminating, ensuring all running containers in the pod are stopped
 | 
						|
// 4. Terminated, cleaning up any resources that must be released before the pod can be deleted
 | 
						|
//
 | 
						|
// The podWorkerLoop is driven by updates delivered to UpdatePod and by SyncKnownPods. If a particular
 | 
						|
// sync method fails, p.workerQueue is updated with backoff but it is the responsibility of the kubelet
 | 
						|
// to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the
 | 
						|
// caller. When a pod transitions working->terminating or terminating->terminated, the next update is
 | 
						|
// queued immediately and no kubelet action is required.
 | 
						|
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
 | 
						|
	var lastSyncTime time.Time
 | 
						|
	for range podUpdates {
 | 
						|
		ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
 | 
						|
		// If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate.
 | 
						|
		if !ok {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// If the pod was terminated prior to the pod being allowed to start, we exit the loop.
 | 
						|
		if !canEverStart {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		// If the pod is not yet ready to start, continue and wait for more updates.
 | 
						|
		if !canStart {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		podUID, podRef := podUIDAndRefForUpdate(update.Options)
 | 
						|
 | 
						|
		klog.V(4).InfoS("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
 | 
						|
		var isTerminal bool
 | 
						|
		err := func() error {
 | 
						|
			// The worker is responsible for ensuring the sync method sees the appropriate
 | 
						|
			// status updates on resyncs (the result of the last sync), transitions to
 | 
						|
			// terminating (no wait), or on terminated (whatever the most recent state is).
 | 
						|
			// Only syncing and terminating can generate pod status changes, while terminated
 | 
						|
			// pods ensure the most recent status makes it to the api server.
 | 
						|
			var status *kubecontainer.PodStatus
 | 
						|
			var err error
 | 
						|
			switch {
 | 
						|
			case update.Options.RunningPod != nil:
 | 
						|
				// when we receive a running pod, we don't need status at all because we are
 | 
						|
				// guaranteed to be terminating and we skip updates to the pod
 | 
						|
			default:
 | 
						|
				// wait until we see the next refresh from the PLEG via the cache (max 2s)
 | 
						|
				// TODO: this adds ~1s of latency on all transitions from sync to terminating
 | 
						|
				//  to terminated, and on all termination retries (including evictions). We should
 | 
						|
				//  improve latency by making the pleg continuous and by allowing pod status
 | 
						|
				//  changes to be refreshed when key events happen (killPod, sync->terminating).
 | 
						|
				//  Improving this latency also reduces the possibility that a terminated
 | 
						|
				//  container's status is garbage collected before we have a chance to update the
 | 
						|
				//  API server (thus losing the exit code).
 | 
						|
				status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)
 | 
						|
 | 
						|
				if err != nil {
 | 
						|
					// This is the legacy event thrown by manage pod loop all other events are now dispatched
 | 
						|
					// from syncPodFn
 | 
						|
					p.recorder.Eventf(update.Options.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
 | 
						|
					return err
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			// Take the appropriate action (illegal phases are prevented by UpdatePod)
 | 
						|
			switch {
 | 
						|
			case update.WorkType == TerminatedPod:
 | 
						|
				err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status)
 | 
						|
 | 
						|
			case update.WorkType == TerminatingPod:
 | 
						|
				var gracePeriod *int64
 | 
						|
				if opt := update.Options.KillPodOptions; opt != nil {
 | 
						|
					gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
 | 
						|
				}
 | 
						|
				podStatusFn := p.acknowledgeTerminating(podUID)
 | 
						|
 | 
						|
				// if we only have a running pod, terminate it directly
 | 
						|
				if update.Options.RunningPod != nil {
 | 
						|
					err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod)
 | 
						|
				} else {
 | 
						|
					err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn)
 | 
						|
				}
 | 
						|
 | 
						|
			default:
 | 
						|
				isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
 | 
						|
			}
 | 
						|
 | 
						|
			lastSyncTime = p.clock.Now()
 | 
						|
			return err
 | 
						|
		}()
 | 
						|
 | 
						|
		var phaseTransition bool
 | 
						|
		switch {
 | 
						|
		case err == context.Canceled:
 | 
						|
			// when the context is cancelled we expect an update to already be queued
 | 
						|
			klog.V(2).InfoS("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
 | 
						|
 | 
						|
		case err != nil:
 | 
						|
			// we will queue a retry
 | 
						|
			klog.ErrorS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID)
 | 
						|
 | 
						|
		case update.WorkType == TerminatedPod:
 | 
						|
			// we can shut down the worker
 | 
						|
			p.completeTerminated(podUID)
 | 
						|
			if start := update.Options.StartTime; !start.IsZero() {
 | 
						|
				metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
 | 
						|
			}
 | 
						|
			klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
 | 
						|
			return
 | 
						|
 | 
						|
		case update.WorkType == TerminatingPod:
 | 
						|
			// pods that don't exist in config don't need to be terminated, other loops will clean them up
 | 
						|
			if update.Options.RunningPod != nil {
 | 
						|
				p.completeTerminatingRuntimePod(podUID)
 | 
						|
				if start := update.Options.StartTime; !start.IsZero() {
 | 
						|
					metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
 | 
						|
				}
 | 
						|
				klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
 | 
						|
				return
 | 
						|
			}
 | 
						|
			// otherwise we move to the terminating phase
 | 
						|
			p.completeTerminating(podUID)
 | 
						|
			phaseTransition = true
 | 
						|
 | 
						|
		case isTerminal:
 | 
						|
			// if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
 | 
						|
			klog.V(4).InfoS("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
 | 
						|
			p.completeSync(podUID)
 | 
						|
			phaseTransition = true
 | 
						|
		}
 | 
						|
 | 
						|
		// queue a retry if necessary, then put the next event in the channel if any
 | 
						|
		p.completeWork(podUID, phaseTransition, err)
 | 
						|
		if start := update.Options.StartTime; !start.IsZero() {
 | 
						|
			metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
 | 
						|
		}
 | 
						|
		klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// acknowledgeTerminating sets the terminating flag on the pod status once the pod worker sees
 | 
						|
// the termination state so that other components know no new containers will be started in this
 | 
						|
// pod. It then returns the status function, if any, that applies to this pod.
 | 
						|
func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
 | 
						|
	status, ok := p.podSyncStatuses[podUID]
 | 
						|
	if !ok {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if !status.terminatingAt.IsZero() && !status.startedTerminating {
 | 
						|
		klog.V(4).InfoS("Pod worker has observed request to terminate", "podUID", podUID)
 | 
						|
		status.startedTerminating = true
 | 
						|
	}
 | 
						|
 | 
						|
	if l := len(status.statusPostTerminating); l > 0 {
 | 
						|
		return status.statusPostTerminating[l-1]
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should
 | 
						|
// be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways
 | 
						|
// exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by
 | 
						|
// UpdatePod.
 | 
						|
func (p *podWorkers) completeSync(podUID types.UID) {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
 | 
						|
	klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "podUID", podUID)
 | 
						|
 | 
						|
	status, ok := p.podSyncStatuses[podUID]
 | 
						|
	if !ok {
 | 
						|
		klog.V(4).InfoS("Pod had no status in completeSync, programmer error?", "podUID", podUID)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// update the status of the pod
 | 
						|
	if status.terminatingAt.IsZero() {
 | 
						|
		status.terminatingAt = p.clock.Now()
 | 
						|
	} else {
 | 
						|
		klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "podUID", podUID)
 | 
						|
	}
 | 
						|
	status.startedTerminating = true
 | 
						|
 | 
						|
	// the pod has now transitioned to terminating and we want to run syncTerminatingPod
 | 
						|
	// as soon as possible, so if no update is already waiting queue a synthetic update
 | 
						|
	p.requeueLastPodUpdate(podUID, status)
 | 
						|
}
 | 
						|
 | 
						|
// completeTerminating is invoked when syncTerminatingPod completes successfully, which means
 | 
						|
// no container is running, no container will be started in the future, and we are ready for
 | 
						|
// cleanup.  This updates the termination state which prevents future syncs and will ensure
 | 
						|
// other kubelet loops know this pod is not running any containers.
 | 
						|
func (p *podWorkers) completeTerminating(podUID types.UID) {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
 | 
						|
	klog.V(4).InfoS("Pod terminated all containers successfully", "podUID", podUID)
 | 
						|
 | 
						|
	status, ok := p.podSyncStatuses[podUID]
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// update the status of the pod
 | 
						|
	if status.terminatingAt.IsZero() {
 | 
						|
		klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
 | 
						|
	}
 | 
						|
	status.terminatedAt = p.clock.Now()
 | 
						|
	for _, ch := range status.notifyPostTerminating {
 | 
						|
		close(ch)
 | 
						|
	}
 | 
						|
	status.notifyPostTerminating = nil
 | 
						|
	status.statusPostTerminating = nil
 | 
						|
 | 
						|
	// the pod has now transitioned to terminated and we want to run syncTerminatedPod
 | 
						|
	// as soon as possible, so if no update is already waiting queue a synthetic update
 | 
						|
	p.requeueLastPodUpdate(podUID, status)
 | 
						|
}
 | 
						|
 | 
						|
// completeTerminatingRuntimePod is invoked when syncTerminatingPod completes successfully,
 | 
						|
// which means an orphaned pod (no config) is terminated and we can exit. Since orphaned
 | 
						|
// pods have no API representation, we want to exit the loop at this point and ensure no
 | 
						|
// status is present afterwards - the running pod is truly terminated when this is invoked.
 | 
						|
func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
 | 
						|
	klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "podUID", podUID)
 | 
						|
 | 
						|
	p.cleanupPodUpdates(podUID)
 | 
						|
 | 
						|
	status, ok := p.podSyncStatuses[podUID]
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if status.terminatingAt.IsZero() {
 | 
						|
		klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
 | 
						|
	}
 | 
						|
	status.terminatedAt = p.clock.Now()
 | 
						|
	status.finished = true
 | 
						|
	status.working = false
 | 
						|
 | 
						|
	if p.startedStaticPodsByFullname[status.fullname] == podUID {
 | 
						|
		delete(p.startedStaticPodsByFullname, status.fullname)
 | 
						|
	}
 | 
						|
 | 
						|
	// A runtime pod is transient and not part of the desired state - once it has reached
 | 
						|
	// terminated we can abandon tracking it.
 | 
						|
	delete(p.podSyncStatuses, podUID)
 | 
						|
}
 | 
						|
 | 
						|
// completeTerminated is invoked after syncTerminatedPod completes successfully and means we
 | 
						|
// can stop the pod worker. The pod is finalized at this point.
 | 
						|
func (p *podWorkers) completeTerminated(podUID types.UID) {
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
 | 
						|
	klog.V(4).InfoS("Pod is complete and the worker can now stop", "podUID", podUID)
 | 
						|
 | 
						|
	p.cleanupPodUpdates(podUID)
 | 
						|
 | 
						|
	status, ok := p.podSyncStatuses[podUID]
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if status.terminatingAt.IsZero() {
 | 
						|
		klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "podUID", podUID)
 | 
						|
	}
 | 
						|
	if status.terminatedAt.IsZero() {
 | 
						|
		klog.V(4).InfoS("Pod worker is complete but did not have terminatedAt set, likely programmer error", "podUID", podUID)
 | 
						|
	}
 | 
						|
	status.finished = true
 | 
						|
	status.working = false
 | 
						|
 | 
						|
	if p.startedStaticPodsByFullname[status.fullname] == podUID {
 | 
						|
		delete(p.startedStaticPodsByFullname, status.fullname)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// completeWork requeues on error or the next sync interval and then immediately executes any pending
 | 
						|
// work.
 | 
						|
func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncErr error) {
 | 
						|
	// Requeue the last update if the last sync returned error.
 | 
						|
	switch {
 | 
						|
	case phaseTransition:
 | 
						|
		p.workQueue.Enqueue(podUID, 0)
 | 
						|
	case syncErr == nil:
 | 
						|
		// No error; requeue at the regular resync interval.
 | 
						|
		p.workQueue.Enqueue(podUID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
 | 
						|
	case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
 | 
						|
		// Network is not ready; back off for short period of time and retry as network might be ready soon.
 | 
						|
		p.workQueue.Enqueue(podUID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
 | 
						|
	default:
 | 
						|
		// Error occurred during the sync; back off and then retry.
 | 
						|
		p.workQueue.Enqueue(podUID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
 | 
						|
	}
 | 
						|
 | 
						|
	// if there is a pending update for this worker, requeue immediately, otherwise
 | 
						|
	// clear working status
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
	if status, ok := p.podSyncStatuses[podUID]; ok {
 | 
						|
		if status.pendingUpdate != nil {
 | 
						|
			select {
 | 
						|
			case p.podUpdates[podUID] <- struct{}{}:
 | 
						|
				klog.V(4).InfoS("Requeueing pod due to pending update", "podUID", podUID)
 | 
						|
			default:
 | 
						|
				klog.V(4).InfoS("Pending update already queued", "podUID", podUID)
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			status.working = false
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// SyncKnownPods will purge any fully terminated pods that are not in the desiredPods
 | 
						|
// list, which means SyncKnownPods must be called in a threadsafe manner from calls
 | 
						|
// to UpdatePods for new pods. Because the podworker is dependent on UpdatePod being
 | 
						|
// invoked to drive a pod's state machine, if a pod is missing in the desired list the
 | 
						|
// pod worker must be responsible for delivering that update. The method returns a map
 | 
						|
// of known workers that are not finished with a value of SyncPodTerminated,
 | 
						|
// SyncPodKill, or SyncPodSync depending on whether the pod is terminated, terminating,
 | 
						|
// or syncing.
 | 
						|
func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync {
 | 
						|
	workers := make(map[types.UID]PodWorkerSync)
 | 
						|
	known := make(map[types.UID]struct{})
 | 
						|
	for _, pod := range desiredPods {
 | 
						|
		known[pod.UID] = struct{}{}
 | 
						|
	}
 | 
						|
 | 
						|
	p.podLock.Lock()
 | 
						|
	defer p.podLock.Unlock()
 | 
						|
 | 
						|
	p.podsSynced = true
 | 
						|
	for uid, status := range p.podSyncStatuses {
 | 
						|
		// We retain the worker history of any pod that is still desired according to
 | 
						|
		// its UID. However, there are two scenarios during a sync that result in us
 | 
						|
		// needing to purge the history:
 | 
						|
		//
 | 
						|
		// 1. The pod is no longer desired (the local version is orphaned)
 | 
						|
		// 2. The pod received a kill update and then a subsequent create, which means
 | 
						|
		//    the UID was reused in the source config (vanishingly rare for API servers,
 | 
						|
		//    common for static pods that have specified a fixed UID)
 | 
						|
		//
 | 
						|
		// In the former case we wish to bound the amount of information we store for
 | 
						|
		// deleted pods. In the latter case we wish to minimize the amount of time before
 | 
						|
		// we restart the static pod. If we succeed at removing the worker, then we
 | 
						|
		// omit it from the returned map of known workers, and the caller of SyncKnownPods
 | 
						|
		// is expected to send a new UpdatePod({UpdateType: Create}).
 | 
						|
		_, knownPod := known[uid]
 | 
						|
		orphan := !knownPod
 | 
						|
		if status.restartRequested || orphan {
 | 
						|
			if p.removeTerminatedWorker(uid, status, orphan) {
 | 
						|
				// no worker running, we won't return it
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		sync := PodWorkerSync{
 | 
						|
			State:  status.WorkType(),
 | 
						|
			Orphan: orphan,
 | 
						|
		}
 | 
						|
		switch {
 | 
						|
		case status.activeUpdate != nil:
 | 
						|
			if status.activeUpdate.Pod != nil {
 | 
						|
				sync.HasConfig = true
 | 
						|
				sync.Static = kubetypes.IsStaticPod(status.activeUpdate.Pod)
 | 
						|
			}
 | 
						|
		case status.pendingUpdate != nil:
 | 
						|
			if status.pendingUpdate.Pod != nil {
 | 
						|
				sync.HasConfig = true
 | 
						|
				sync.Static = kubetypes.IsStaticPod(status.pendingUpdate.Pod)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		workers[uid] = sync
 | 
						|
	}
 | 
						|
	return workers
 | 
						|
}
 | 
						|
 | 
						|
// removeTerminatedWorker cleans up and removes the worker status for a worker
 | 
						|
// that has reached a terminal state of "finished" - has successfully exited
 | 
						|
// syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be
 | 
						|
// recreated with the same UID. The kubelet preserves state about recently
 | 
						|
// terminated pods to prevent accidentally restarting a terminal pod, which is
 | 
						|
// proportional to the number of pods described in the pod config. The method
 | 
						|
// returns true if the worker was completely removed.
 | 
						|
func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus, orphaned bool) bool {
 | 
						|
	if !status.finished {
 | 
						|
		// If the pod worker has not reached terminal state and the pod is still known, we wait.
 | 
						|
		if !orphaned {
 | 
						|
			klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
 | 
						|
			return false
 | 
						|
		}
 | 
						|
 | 
						|
		// all orphaned pods are considered deleted
 | 
						|
		status.deleted = true
 | 
						|
 | 
						|
		// When a pod is no longer in the desired set, the pod is considered orphaned and the
 | 
						|
		// the pod worker becomes responsible for driving the pod to completion (there is no
 | 
						|
		// guarantee another component will notify us of updates).
 | 
						|
		switch {
 | 
						|
		case !status.IsStarted() && !status.observedRuntime:
 | 
						|
			// The pod has not been started, which means we can safely clean up the pod - the
 | 
						|
			// pod worker will shutdown as a result of this change without executing a sync.
 | 
						|
			klog.V(4).InfoS("Pod is orphaned and has not been started", "podUID", uid)
 | 
						|
		case !status.IsTerminationRequested():
 | 
						|
			// The pod has been started but termination has not been requested - set the appropriate
 | 
						|
			// timestamp and notify the pod worker. Because the pod has been synced at least once,
 | 
						|
			// the value of status.activeUpdate will be the fallback for the next sync.
 | 
						|
			status.terminatingAt = p.clock.Now()
 | 
						|
			if status.activeUpdate != nil && status.activeUpdate.Pod != nil {
 | 
						|
				status.gracePeriod, _ = calculateEffectiveGracePeriod(status, status.activeUpdate.Pod, nil)
 | 
						|
			} else {
 | 
						|
				status.gracePeriod = 1
 | 
						|
			}
 | 
						|
			p.requeueLastPodUpdate(uid, status)
 | 
						|
			klog.V(4).InfoS("Pod is orphaned and still running, began terminating", "podUID", uid)
 | 
						|
			return false
 | 
						|
		default:
 | 
						|
			// The pod is already moving towards termination, notify the pod worker. Because the pod
 | 
						|
			// has been synced at least once, the value of status.activeUpdate will be the fallback for
 | 
						|
			// the next sync.
 | 
						|
			p.requeueLastPodUpdate(uid, status)
 | 
						|
			klog.V(4).InfoS("Pod is orphaned and still terminating, notified the pod worker", "podUID", uid)
 | 
						|
			return false
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if status.restartRequested {
 | 
						|
		klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid)
 | 
						|
	} else {
 | 
						|
		klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
 | 
						|
	}
 | 
						|
	delete(p.podSyncStatuses, uid)
 | 
						|
	p.cleanupPodUpdates(uid)
 | 
						|
 | 
						|
	if p.startedStaticPodsByFullname[status.fullname] == uid {
 | 
						|
		delete(p.startedStaticPodsByFullname, status.fullname)
 | 
						|
	}
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// killPodNow returns a KillPodFunc that can be used to kill a pod.
 | 
						|
// It is intended to be injected into other modules that need to kill a pod.
 | 
						|
func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
 | 
						|
	return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, statusFn func(*v1.PodStatus)) error {
 | 
						|
		// determine the grace period to use when killing the pod
 | 
						|
		gracePeriod := int64(0)
 | 
						|
		if gracePeriodOverride != nil {
 | 
						|
			gracePeriod = *gracePeriodOverride
 | 
						|
		} else if pod.Spec.TerminationGracePeriodSeconds != nil {
 | 
						|
			gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
 | 
						|
		}
 | 
						|
 | 
						|
		// we timeout and return an error if we don't get a callback within a reasonable time.
 | 
						|
		// the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill)
 | 
						|
		timeout := gracePeriod + (gracePeriod / 2)
 | 
						|
		minTimeout := int64(10)
 | 
						|
		if timeout < minTimeout {
 | 
						|
			timeout = minTimeout
 | 
						|
		}
 | 
						|
		timeoutDuration := time.Duration(timeout) * time.Second
 | 
						|
 | 
						|
		// open a channel we block against until we get a result
 | 
						|
		ch := make(chan struct{}, 1)
 | 
						|
		podWorkers.UpdatePod(UpdatePodOptions{
 | 
						|
			Pod:        pod,
 | 
						|
			UpdateType: kubetypes.SyncPodKill,
 | 
						|
			KillPodOptions: &KillPodOptions{
 | 
						|
				CompletedCh:                              ch,
 | 
						|
				Evict:                                    isEvicted,
 | 
						|
				PodStatusFunc:                            statusFn,
 | 
						|
				PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
 | 
						|
			},
 | 
						|
		})
 | 
						|
 | 
						|
		// wait for either a response, or a timeout
 | 
						|
		select {
 | 
						|
		case <-ch:
 | 
						|
			return nil
 | 
						|
		case <-time.After(timeoutDuration):
 | 
						|
			recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.")
 | 
						|
			return fmt.Errorf("timeout waiting to kill pod")
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// cleanupPodUpdates closes the podUpdates channel and removes it from
 | 
						|
// podUpdates map so that the corresponding pod worker can stop. It also
 | 
						|
// removes any undelivered work. This method must be called holding the
 | 
						|
// pod lock.
 | 
						|
func (p *podWorkers) cleanupPodUpdates(uid types.UID) {
 | 
						|
	if ch, ok := p.podUpdates[uid]; ok {
 | 
						|
		close(ch)
 | 
						|
	}
 | 
						|
	delete(p.podUpdates, uid)
 | 
						|
}
 | 
						|
 | 
						|
// requeueLastPodUpdate creates a new pending pod update from the most recently
 | 
						|
// executed update if no update is already queued, and then notifies the pod
 | 
						|
// worker goroutine of the update. This method must be called while holding
 | 
						|
// the pod lock.
 | 
						|
func (p *podWorkers) requeueLastPodUpdate(podUID types.UID, status *podSyncStatus) {
 | 
						|
	// if there is already an update queued, we can use that instead, or if
 | 
						|
	// we have no previously executed update, we cannot replay it.
 | 
						|
	if status.pendingUpdate != nil || status.activeUpdate == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	copied := *status.activeUpdate
 | 
						|
	status.pendingUpdate = &copied
 | 
						|
 | 
						|
	// notify the pod worker
 | 
						|
	status.working = true
 | 
						|
	select {
 | 
						|
	case p.podUpdates[podUID] <- struct{}{}:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
}
 |