mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #41095 from dashpole/deletion_pod_lifecycle
Automatic merge from submit-queue [Kubelet] Delay deletion of pod from the API server until volumes are deleted Previous PR that was reverted: #40239. To summarize the conclusion of the previous PR after reverting: - The status manager has the most up-to-date status, but the volume manager uses the status from the pod manager, which only is as up-to-date as the API server. - Because of this, the previous change required an additional round trip between the kubelet and API server. - When few pods are being added or deleted, this is only a minor issue. However, when under heavy load, the QPS limit to the API server causes this round trip to take ~60 seconds, which is an unacceptable increase in latency. Take a look at the graphs in #40239 to see the effect of QPS changes on timing. - To remedy this, the volume manager looks at the status from the status manager, which eliminates the round trip. cc: @vishh @derekwaynecarr @sjenning @jingxu97 @kubernetes/sig-storage-misc
This commit is contained in:
		@@ -182,6 +182,7 @@ go_test(
 | 
			
		||||
        "//pkg/kubelet/server/remotecommand:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/server/stats:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status/testing:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/types:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/util/queue:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/util/sliceutils:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -690,7 +690,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
 | 
			
		||||
	}
 | 
			
		||||
	klet.imageManager = imageManager
 | 
			
		||||
 | 
			
		||||
	klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager)
 | 
			
		||||
	klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
 | 
			
		||||
 | 
			
		||||
	klet.probeManager = prober.NewManager(
 | 
			
		||||
		klet.statusManager,
 | 
			
		||||
@@ -715,6 +715,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
 | 
			
		||||
		kubeCfg.EnableControllerAttachDetach,
 | 
			
		||||
		nodeName,
 | 
			
		||||
		klet.podManager,
 | 
			
		||||
		klet.statusManager,
 | 
			
		||||
		klet.kubeClient,
 | 
			
		||||
		klet.volumePluginMgr,
 | 
			
		||||
		klet.containerRuntime,
 | 
			
		||||
 
 | 
			
		||||
@@ -728,6 +728,37 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Returns true if all required node-level resources that a pod was consuming have been reclaimed by the kubelet.
 | 
			
		||||
// Reclaiming resources is a prerequisite to deleting a pod from the API server.
 | 
			
		||||
func (kl *Kubelet) OkToDeletePod(pod *v1.Pod) bool {
 | 
			
		||||
	if pod.DeletionTimestamp == nil {
 | 
			
		||||
		// We shouldnt delete pods whose DeletionTimestamp is not set
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	if !notRunning(pod.Status.ContainerStatuses) {
 | 
			
		||||
		// We shouldnt delete pods that still have running containers
 | 
			
		||||
		glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	if kl.podVolumesExist(pod.UID) && !kl.kubeletConfiguration.KeepTerminatedPodVolumes {
 | 
			
		||||
		// We shouldnt delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
 | 
			
		||||
		glog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// notRunning returns true if every status is terminated or waiting, or the status list
 | 
			
		||||
// is empty.
 | 
			
		||||
func notRunning(statuses []v1.ContainerStatus) bool {
 | 
			
		||||
	for _, status := range statuses {
 | 
			
		||||
		if status.State.Terminated == nil && status.State.Waiting == nil {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// filterOutTerminatedPods returns the given pods which the status manager
 | 
			
		||||
// does not consider failed or succeeded.
 | 
			
		||||
func (kl *Kubelet) filterOutTerminatedPods(pods []*v1.Pod) []*v1.Pod {
 | 
			
		||||
 
 | 
			
		||||
@@ -60,6 +60,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/secret"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/server/stats"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/status"
 | 
			
		||||
	statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
 | 
			
		||||
	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/util/queue"
 | 
			
		||||
	kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
 | 
			
		||||
@@ -180,7 +181,7 @@ func newTestKubeletWithImageList(
 | 
			
		||||
	}
 | 
			
		||||
	kubelet.secretManager = secretManager
 | 
			
		||||
	kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager)
 | 
			
		||||
	kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager)
 | 
			
		||||
	kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
 | 
			
		||||
	kubelet.containerRefManager = kubecontainer.NewRefManager()
 | 
			
		||||
	diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -262,6 +263,7 @@ func newTestKubeletWithImageList(
 | 
			
		||||
		controllerAttachDetachEnabled,
 | 
			
		||||
		kubelet.nodeName,
 | 
			
		||||
		kubelet.podManager,
 | 
			
		||||
		kubelet.statusManager,
 | 
			
		||||
		fakeKubeClient,
 | 
			
		||||
		kubelet.volumePluginMgr,
 | 
			
		||||
		fakeRuntime,
 | 
			
		||||
 
 | 
			
		||||
@@ -56,6 +56,7 @@ go_test(
 | 
			
		||||
        "//pkg/kubelet/pod:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/prober/results:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status/testing:go_default_library",
 | 
			
		||||
        "//pkg/probe:go_default_library",
 | 
			
		||||
        "//pkg/util/exec:go_default_library",
 | 
			
		||||
        "//vendor:github.com/golang/glog",
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,7 @@ import (
 | 
			
		||||
	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/status"
 | 
			
		||||
	statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/probe"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/exec"
 | 
			
		||||
)
 | 
			
		||||
@@ -102,7 +103,7 @@ func newTestManager() *manager {
 | 
			
		||||
	// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
 | 
			
		||||
	podManager.AddPod(getTestPod())
 | 
			
		||||
	m := NewManager(
 | 
			
		||||
		status.NewManager(&fake.Clientset{}, podManager),
 | 
			
		||||
		status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}),
 | 
			
		||||
		results.NewManager(),
 | 
			
		||||
		nil, // runner
 | 
			
		||||
		refManager,
 | 
			
		||||
 
 | 
			
		||||
@@ -31,6 +31,7 @@ import (
 | 
			
		||||
	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/status"
 | 
			
		||||
	statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/probe"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/exec"
 | 
			
		||||
)
 | 
			
		||||
@@ -117,7 +118,7 @@ func TestDoProbe(t *testing.T) {
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Clean up.
 | 
			
		||||
			m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil))
 | 
			
		||||
			m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil), &statustest.FakePodDeletionSafetyProvider{})
 | 
			
		||||
			resultsManager(m, probeType).Remove(testContainerID)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -44,6 +44,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/secret"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/server/stats"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/status"
 | 
			
		||||
	statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/volumemanager"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume"
 | 
			
		||||
	volumetest "k8s.io/kubernetes/pkg/volume/testing"
 | 
			
		||||
@@ -77,7 +78,7 @@ func TestRunOnce(t *testing.T) {
 | 
			
		||||
		cadvisor:            cadvisor,
 | 
			
		||||
		nodeLister:          testNodeLister{},
 | 
			
		||||
		nodeInfo:            testNodeInfo{},
 | 
			
		||||
		statusManager:       status.NewManager(nil, podManager),
 | 
			
		||||
		statusManager:       status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}),
 | 
			
		||||
		containerRefManager: kubecontainer.NewRefManager(),
 | 
			
		||||
		podManager:          podManager,
 | 
			
		||||
		os:                  &containertest.FakeOS{},
 | 
			
		||||
@@ -102,6 +103,7 @@ func TestRunOnce(t *testing.T) {
 | 
			
		||||
		true,
 | 
			
		||||
		kb.nodeName,
 | 
			
		||||
		kb.podManager,
 | 
			
		||||
		kb.statusManager,
 | 
			
		||||
		kb.kubeClient,
 | 
			
		||||
		kb.volumePluginMgr,
 | 
			
		||||
		fakeRuntime,
 | 
			
		||||
 
 | 
			
		||||
@@ -51,6 +51,7 @@ go_test(
 | 
			
		||||
        "//pkg/kubelet/pod:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/pod/testing:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/secret:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status/testing:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/types:go_default_library",
 | 
			
		||||
        "//vendor:github.com/stretchr/testify/assert",
 | 
			
		||||
        "//vendor:k8s.io/apimachinery/pkg/api/errors",
 | 
			
		||||
@@ -70,6 +71,9 @@ filegroup(
 | 
			
		||||
 | 
			
		||||
filegroup(
 | 
			
		||||
    name = "all-srcs",
 | 
			
		||||
    srcs = [":package-srcs"],
 | 
			
		||||
    srcs = [
 | 
			
		||||
        ":package-srcs",
 | 
			
		||||
        "//pkg/kubelet/status/testing:all-srcs",
 | 
			
		||||
    ],
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -67,6 +67,7 @@ type manager struct {
 | 
			
		||||
	// Map from (mirror) pod UID to latest status version successfully sent to the API server.
 | 
			
		||||
	// apiStatusVersions must only be accessed from the sync thread.
 | 
			
		||||
	apiStatusVersions map[types.UID]uint64
 | 
			
		||||
	podDeletionSafety PodDeletionSafetyProvider
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// PodStatusProvider knows how to provide status for a pod.  It's intended to be used by other components
 | 
			
		||||
@@ -77,6 +78,12 @@ type PodStatusProvider interface {
 | 
			
		||||
	GetPodStatus(uid types.UID) (v1.PodStatus, bool)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// An object which provides guarantees that a pod can be saftely deleted.
 | 
			
		||||
type PodDeletionSafetyProvider interface {
 | 
			
		||||
	// A function which returns true if the pod can safely be deleted
 | 
			
		||||
	OkToDeletePod(pod *v1.Pod) bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
 | 
			
		||||
// the latest v1.PodStatus. It also syncs updates back to the API server.
 | 
			
		||||
type Manager interface {
 | 
			
		||||
@@ -103,13 +110,14 @@ type Manager interface {
 | 
			
		||||
 | 
			
		||||
const syncPeriod = 10 * time.Second
 | 
			
		||||
 | 
			
		||||
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager) Manager {
 | 
			
		||||
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {
 | 
			
		||||
	return &manager{
 | 
			
		||||
		kubeClient:        kubeClient,
 | 
			
		||||
		podManager:        podManager,
 | 
			
		||||
		podStatuses:       make(map[types.UID]versionedPodStatus),
 | 
			
		||||
		podStatusChannel:  make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
 | 
			
		||||
		apiStatusVersions: make(map[types.UID]uint64),
 | 
			
		||||
		podDeletionSafety: podDeletionSafety,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -381,7 +389,7 @@ func (m *manager) syncBatch() {
 | 
			
		||||
				}
 | 
			
		||||
				syncedUID = mirrorUID
 | 
			
		||||
			}
 | 
			
		||||
			if m.needsUpdate(syncedUID, status) {
 | 
			
		||||
			if m.needsUpdate(syncedUID, status) || m.couldBeDeleted(uid, status.status) {
 | 
			
		||||
				updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
 | 
			
		||||
			} else if m.needsReconcile(uid, status.status) {
 | 
			
		||||
				// Delete the apiStatusVersions here to force an update on the pod status
 | 
			
		||||
@@ -434,11 +442,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
 | 
			
		||||
				// We don't handle graceful deletion of mirror pods.
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if pod.DeletionTimestamp == nil {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if !notRunning(pod.Status.ContainerStatuses) {
 | 
			
		||||
				glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
 | 
			
		||||
			if !m.podDeletionSafety.OkToDeletePod(pod) {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			deleteOptions := metav1.NewDeleteOptions(0)
 | 
			
		||||
@@ -463,6 +467,15 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
 | 
			
		||||
	return !ok || latest < status.version
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *manager) couldBeDeleted(uid types.UID, status v1.PodStatus) bool {
 | 
			
		||||
	// The pod could be a static pod, so we should translate first.
 | 
			
		||||
	pod, ok := m.podManager.GetPodByUID(uid)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	return !kubepod.IsMirrorPod(pod) && m.podDeletionSafety.OkToDeletePod(pod)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// needsReconcile compares the given status with the status in the pod manager (which
 | 
			
		||||
// in fact comes from apiserver), returns whether the status needs to be reconciled with
 | 
			
		||||
// the apiserver. Now when pod status is inconsistent between apiserver and kubelet,
 | 
			
		||||
@@ -563,17 +576,6 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
 | 
			
		||||
	return status
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// notRunning returns true if every status is terminated or waiting, or the status list
 | 
			
		||||
// is empty.
 | 
			
		||||
func notRunning(statuses []v1.ContainerStatus) bool {
 | 
			
		||||
	for _, status := range statuses {
 | 
			
		||||
		if status.State.Terminated == nil && status.State.Waiting == nil {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func copyStatus(source *v1.PodStatus) (v1.PodStatus, error) {
 | 
			
		||||
	clone, err := api.Scheme.DeepCopy(source)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -39,6 +39,7 @@ import (
 | 
			
		||||
	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
			
		||||
	podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
 | 
			
		||||
	kubesecret "k8s.io/kubernetes/pkg/kubelet/secret"
 | 
			
		||||
	statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
 | 
			
		||||
	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -74,7 +75,7 @@ func (m *manager) testSyncBatch() {
 | 
			
		||||
func newTestManager(kubeClient clientset.Interface) *manager {
 | 
			
		||||
	podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager())
 | 
			
		||||
	podManager.AddPod(getTestPod())
 | 
			
		||||
	return NewManager(kubeClient, podManager).(*manager)
 | 
			
		||||
	return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func generateRandomMessage() string {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										31
									
								
								pkg/kubelet/status/testing/BUILD
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								pkg/kubelet/status/testing/BUILD
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,31 @@
 | 
			
		||||
package(default_visibility = ["//visibility:public"])
 | 
			
		||||
 | 
			
		||||
licenses(["notice"])
 | 
			
		||||
 | 
			
		||||
load(
 | 
			
		||||
    "@io_bazel_rules_go//go:def.bzl",
 | 
			
		||||
    "go_library",
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
go_library(
 | 
			
		||||
    name = "go_default_library",
 | 
			
		||||
    srcs = ["fake_pod_deletion_safety.go"],
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
    deps = [
 | 
			
		||||
        "//pkg/api/v1:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/pod:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
filegroup(
 | 
			
		||||
    name = "package-srcs",
 | 
			
		||||
    srcs = glob(["**"]),
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
    visibility = ["//visibility:private"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
filegroup(
 | 
			
		||||
    name = "all-srcs",
 | 
			
		||||
    srcs = [":package-srcs"],
 | 
			
		||||
    tags = ["automanaged"],
 | 
			
		||||
)
 | 
			
		||||
							
								
								
									
										28
									
								
								pkg/kubelet/status/testing/fake_pod_deletion_safety.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								pkg/kubelet/status/testing/fake_pod_deletion_safety.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,28 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package testing
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type FakePodDeletionSafetyProvider struct{}
 | 
			
		||||
 | 
			
		||||
func (f *FakePodDeletionSafetyProvider) OkToDeletePod(pod *v1.Pod) bool {
 | 
			
		||||
	return !kubepod.IsMirrorPod(pod) && pod.DeletionTimestamp != nil
 | 
			
		||||
}
 | 
			
		||||
@@ -18,6 +18,7 @@ go_library(
 | 
			
		||||
        "//pkg/kubelet/config:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/container:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/pod:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/util/format:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/volumemanager/cache:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/volumemanager/populator:go_default_library",
 | 
			
		||||
@@ -50,6 +51,8 @@ go_test(
 | 
			
		||||
        "//pkg/kubelet/pod:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/pod/testing:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/secret:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status/testing:go_default_library",
 | 
			
		||||
        "//pkg/util/mount:go_default_library",
 | 
			
		||||
        "//pkg/volume:go_default_library",
 | 
			
		||||
        "//pkg/volume/testing:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -17,6 +17,7 @@ go_library(
 | 
			
		||||
        "//pkg/client/clientset_generated/clientset:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/container:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/pod:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/status:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/util/format:go_default_library",
 | 
			
		||||
        "//pkg/kubelet/volumemanager/cache:go_default_library",
 | 
			
		||||
        "//pkg/volume:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -35,6 +35,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | 
			
		||||
	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/pod"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/status"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/util/format"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume"
 | 
			
		||||
@@ -70,6 +71,7 @@ func NewDesiredStateOfWorldPopulator(
 | 
			
		||||
	loopSleepDuration time.Duration,
 | 
			
		||||
	getPodStatusRetryDuration time.Duration,
 | 
			
		||||
	podManager pod.Manager,
 | 
			
		||||
	podStatusProvider status.PodStatusProvider,
 | 
			
		||||
	desiredStateOfWorld cache.DesiredStateOfWorld,
 | 
			
		||||
	kubeContainerRuntime kubecontainer.Runtime,
 | 
			
		||||
	keepTerminatedPodVolumes bool) DesiredStateOfWorldPopulator {
 | 
			
		||||
@@ -78,6 +80,7 @@ func NewDesiredStateOfWorldPopulator(
 | 
			
		||||
		loopSleepDuration:         loopSleepDuration,
 | 
			
		||||
		getPodStatusRetryDuration: getPodStatusRetryDuration,
 | 
			
		||||
		podManager:                podManager,
 | 
			
		||||
		podStatusProvider:         podStatusProvider,
 | 
			
		||||
		desiredStateOfWorld:       desiredStateOfWorld,
 | 
			
		||||
		pods: processedPods{
 | 
			
		||||
			processedPods: make(map[volumetypes.UniquePodName]bool)},
 | 
			
		||||
@@ -91,6 +94,7 @@ type desiredStateOfWorldPopulator struct {
 | 
			
		||||
	loopSleepDuration         time.Duration
 | 
			
		||||
	getPodStatusRetryDuration time.Duration
 | 
			
		||||
	podManager                pod.Manager
 | 
			
		||||
	podStatusProvider         status.PodStatusProvider
 | 
			
		||||
	desiredStateOfWorld       cache.DesiredStateOfWorld
 | 
			
		||||
	pods                      processedPods
 | 
			
		||||
	kubeContainerRuntime      kubecontainer.Runtime
 | 
			
		||||
@@ -134,15 +138,30 @@ func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func isPodTerminated(pod *v1.Pod) bool {
 | 
			
		||||
	return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded
 | 
			
		||||
func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool {
 | 
			
		||||
	podStatus, found := dswp.podStatusProvider.GetPodStatus(pod.UID)
 | 
			
		||||
	if !found {
 | 
			
		||||
		podStatus = pod.Status
 | 
			
		||||
	}
 | 
			
		||||
	return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// notRunning returns true if every status is terminated or waiting, or the status list
 | 
			
		||||
// is empty.
 | 
			
		||||
func notRunning(statuses []v1.ContainerStatus) bool {
 | 
			
		||||
	for _, status := range statuses {
 | 
			
		||||
		if status.State.Terminated == nil && status.State.Waiting == nil {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Iterate through all pods and add to desired state of world if they don't
 | 
			
		||||
// exist but should
 | 
			
		||||
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
 | 
			
		||||
	for _, pod := range dswp.podManager.GetPods() {
 | 
			
		||||
		if isPodTerminated(pod) {
 | 
			
		||||
		if dswp.isPodTerminated(pod) {
 | 
			
		||||
			// Do not (re)add volumes for terminated pods
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
@@ -160,7 +179,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
 | 
			
		||||
		pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
 | 
			
		||||
		if podExists {
 | 
			
		||||
			// Skip running pods
 | 
			
		||||
			if !isPodTerminated(pod) {
 | 
			
		||||
			if !dswp.isPodTerminated(pod) {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if dswp.keepTerminatedPodVolumes {
 | 
			
		||||
 
 | 
			
		||||
@@ -33,6 +33,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/container"
 | 
			
		||||
	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/pod"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/status"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/util/format"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
 | 
			
		||||
@@ -151,6 +152,7 @@ func NewVolumeManager(
 | 
			
		||||
	controllerAttachDetachEnabled bool,
 | 
			
		||||
	nodeName k8stypes.NodeName,
 | 
			
		||||
	podManager pod.Manager,
 | 
			
		||||
	podStatusProvider status.PodStatusProvider,
 | 
			
		||||
	kubeClient clientset.Interface,
 | 
			
		||||
	volumePluginMgr *volume.VolumePluginMgr,
 | 
			
		||||
	kubeContainerRuntime kubecontainer.Runtime,
 | 
			
		||||
@@ -191,6 +193,7 @@ func NewVolumeManager(
 | 
			
		||||
		desiredStateOfWorldPopulatorLoopSleepPeriod,
 | 
			
		||||
		desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
 | 
			
		||||
		podManager,
 | 
			
		||||
		podStatusProvider,
 | 
			
		||||
		vm.desiredStateOfWorld,
 | 
			
		||||
		kubeContainerRuntime,
 | 
			
		||||
		keepTerminatedPodVolumes)
 | 
			
		||||
 
 | 
			
		||||
@@ -36,6 +36,8 @@ import (
 | 
			
		||||
	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
			
		||||
	podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/secret"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/status"
 | 
			
		||||
	statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/mount"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume"
 | 
			
		||||
	volumetest "k8s.io/kubernetes/pkg/volume/testing"
 | 
			
		||||
@@ -187,11 +189,13 @@ func newTestVolumeManager(
 | 
			
		||||
	fakeRecorder := &record.FakeRecorder{}
 | 
			
		||||
	plugMgr := &volume.VolumePluginMgr{}
 | 
			
		||||
	plugMgr.InitPlugins([]volume.VolumePlugin{plug}, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil))
 | 
			
		||||
	statusManager := status.NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{})
 | 
			
		||||
 | 
			
		||||
	vm, err := NewVolumeManager(
 | 
			
		||||
		true,
 | 
			
		||||
		testHostname,
 | 
			
		||||
		podManager,
 | 
			
		||||
		statusManager,
 | 
			
		||||
		kubeClient,
 | 
			
		||||
		plugMgr,
 | 
			
		||||
		&containertest.FakeRuntime{},
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user