mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Trigger container cleanup within a pod when a container exiting event is detected.
This commit is contained in:
		@@ -81,6 +81,7 @@ import (
 | 
				
			|||||||
	utilerrors "k8s.io/kubernetes/pkg/util/errors"
 | 
						utilerrors "k8s.io/kubernetes/pkg/util/errors"
 | 
				
			||||||
	utilexec "k8s.io/kubernetes/pkg/util/exec"
 | 
						utilexec "k8s.io/kubernetes/pkg/util/exec"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util/flowcontrol"
 | 
						"k8s.io/kubernetes/pkg/util/flowcontrol"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/util/integer"
 | 
				
			||||||
	kubeio "k8s.io/kubernetes/pkg/util/io"
 | 
						kubeio "k8s.io/kubernetes/pkg/util/io"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util/mount"
 | 
						"k8s.io/kubernetes/pkg/util/mount"
 | 
				
			||||||
	utilnet "k8s.io/kubernetes/pkg/util/net"
 | 
						utilnet "k8s.io/kubernetes/pkg/util/net"
 | 
				
			||||||
@@ -156,6 +157,9 @@ const (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// maxImagesInStatus is the number of max images we store in image status.
 | 
						// maxImagesInStatus is the number of max images we store in image status.
 | 
				
			||||||
	maxImagesInNodeStatus = 50
 | 
						maxImagesInNodeStatus = 50
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Minimum number of dead containers to keep in a pod
 | 
				
			||||||
 | 
						minDeadContainerInPod = 1
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// SyncHandler is an interface implemented by Kubelet, for testability
 | 
					// SyncHandler is an interface implemented by Kubelet, for testability
 | 
				
			||||||
@@ -481,6 +485,7 @@ func NewMainKubelet(
 | 
				
			|||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	klet.containerGC = containerGC
 | 
						klet.containerGC = containerGC
 | 
				
			||||||
 | 
						klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// setup imageManager
 | 
						// setup imageManager
 | 
				
			||||||
	imageManager, err := newImageManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy)
 | 
						imageManager, err := newImageManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy)
 | 
				
			||||||
@@ -825,6 +830,9 @@ type Kubelet struct {
 | 
				
			|||||||
	// should manage attachment/detachment of volumes scheduled to this node,
 | 
						// should manage attachment/detachment of volumes scheduled to this node,
 | 
				
			||||||
	// and disable kubelet from executing any attach/detach operations
 | 
						// and disable kubelet from executing any attach/detach operations
 | 
				
			||||||
	enableControllerAttachDetach bool
 | 
						enableControllerAttachDetach bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// trigger deleting containers in a pod
 | 
				
			||||||
 | 
						containerDeletor *podContainerDeletor
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// dirExists returns true if the path exists and represents a directory.
 | 
					// dirExists returns true if the path exists and represents a directory.
 | 
				
			||||||
@@ -2343,7 +2351,6 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
 | 
				
			|||||||
		case kubetypes.SET:
 | 
							case kubetypes.SET:
 | 
				
			||||||
			// TODO: Do we want to support this?
 | 
								// TODO: Do we want to support this?
 | 
				
			||||||
			glog.Errorf("Kubelet does not support snapshot update")
 | 
								glog.Errorf("Kubelet does not support snapshot update")
 | 
				
			||||||
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	case e := <-plegCh:
 | 
						case e := <-plegCh:
 | 
				
			||||||
		if isSyncPodWorthy(e) {
 | 
							if isSyncPodWorthy(e) {
 | 
				
			||||||
@@ -2357,6 +2364,13 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
 | 
				
			|||||||
			glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
 | 
								glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
 | 
				
			||||||
			handler.HandlePodSyncs([]*api.Pod{pod})
 | 
								handler.HandlePodSyncs([]*api.Pod{pod})
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							if e.Type == pleg.ContainerDied {
 | 
				
			||||||
 | 
								if podStatus, err := kl.podCache.Get(e.ID); err == nil {
 | 
				
			||||||
 | 
									if containerID, ok := e.Data.(string); ok {
 | 
				
			||||||
 | 
										kl.containerDeletor.deleteContainersInPod(containerID, podStatus)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	case <-syncCh:
 | 
						case <-syncCh:
 | 
				
			||||||
		// Sync pods waiting for sync
 | 
							// Sync pods waiting for sync
 | 
				
			||||||
		podsToSync := kl.getPodsToSync()
 | 
							podsToSync := kl.getPodsToSync()
 | 
				
			||||||
@@ -3576,7 +3590,7 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
 | 
				
			|||||||
	server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
 | 
						server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Filter out events that are not worthy of pod syncing
 | 
					// isSyncPodWorthy filters out events that are not worthy of pod syncing
 | 
				
			||||||
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
 | 
					func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
 | 
				
			||||||
	// ContatnerRemoved doesn't affect pod state
 | 
						// ContatnerRemoved doesn't affect pod state
 | 
				
			||||||
	return event.Type != pleg.ContainerRemoved
 | 
						return event.Type != pleg.ContainerRemoved
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										104
									
								
								pkg/kubelet/pod_container_deletor.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										104
									
								
								pkg/kubelet/pod_container_deletor.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,104 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package kubelet
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"sort"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/golang/glog"
 | 
				
			||||||
 | 
						kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/util/wait"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						// The limit on the number of buffered container deletion requests
 | 
				
			||||||
 | 
						// This number is a bit arbitrary and may be adjusted in the future.
 | 
				
			||||||
 | 
						containerDeletorBufferLimit = 50
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type containerStatusbyCreatedList []*kubecontainer.ContainerStatus
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type podContainerDeletor struct {
 | 
				
			||||||
 | 
						worker           chan<- kubecontainer.ContainerID
 | 
				
			||||||
 | 
						containersToKeep int
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (a containerStatusbyCreatedList) Len() int           { return len(a) }
 | 
				
			||||||
 | 
					func (a containerStatusbyCreatedList) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
 | 
				
			||||||
 | 
					func (a containerStatusbyCreatedList) Less(i, j int) bool { return a[i].CreatedAt.After(a[j].CreatedAt) }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newPodContainerDeletor(runtime kubecontainer.Runtime, containersToKeep int) *podContainerDeletor {
 | 
				
			||||||
 | 
						buffer := make(chan kubecontainer.ContainerID, containerDeletorBufferLimit)
 | 
				
			||||||
 | 
						go wait.Until(func() {
 | 
				
			||||||
 | 
							for {
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case id := <-buffer:
 | 
				
			||||||
 | 
									runtime.DeleteContainer(id)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}, 0, wait.NeverStop)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return &podContainerDeletor{
 | 
				
			||||||
 | 
							worker:           buffer,
 | 
				
			||||||
 | 
							containersToKeep: containersToKeep,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getContainersToDeleteInPod returns the exited containers in a pod whose name matches the name inferred from exitedContainerID, ordered by the creation time from the latest to the earliest.
 | 
				
			||||||
 | 
					func (p *podContainerDeletor) getContainersToDeleteInPod(exitedContainerID string, podStatus *kubecontainer.PodStatus) containerStatusbyCreatedList {
 | 
				
			||||||
 | 
						var matchedContainer *kubecontainer.ContainerStatus
 | 
				
			||||||
 | 
						var exitedContainers []*kubecontainer.ContainerStatus
 | 
				
			||||||
 | 
						// Find all exited containers in the pod
 | 
				
			||||||
 | 
						for _, containerStatus := range podStatus.ContainerStatuses {
 | 
				
			||||||
 | 
							if containerStatus.State != kubecontainer.ContainerStateExited {
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if containerStatus.ID.ID == exitedContainerID {
 | 
				
			||||||
 | 
								matchedContainer = containerStatus
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							exitedContainers = append(exitedContainers, containerStatus)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if matchedContainer == nil {
 | 
				
			||||||
 | 
							glog.Warningf("Container %q not found in pod's exited containers", exitedContainerID)
 | 
				
			||||||
 | 
							return containerStatusbyCreatedList{}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Find the exited containers whose name matches the name of the container with id being exitedContainerID
 | 
				
			||||||
 | 
						var candidates containerStatusbyCreatedList
 | 
				
			||||||
 | 
						for _, containerStatus := range exitedContainers {
 | 
				
			||||||
 | 
							if matchedContainer.Name == containerStatus.Name {
 | 
				
			||||||
 | 
								candidates = append(candidates, containerStatus)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(candidates) <= p.containersToKeep {
 | 
				
			||||||
 | 
							return containerStatusbyCreatedList{}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						sort.Sort(candidates)
 | 
				
			||||||
 | 
						return candidates[p.containersToKeep:]
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// deleteContainersInPod issues container deletion requests for containers selected by getContainersToDeleteInPod.
 | 
				
			||||||
 | 
					func (p *podContainerDeletor) deleteContainersInPod(exitedContainerID string, podStatus *kubecontainer.PodStatus) {
 | 
				
			||||||
 | 
						for _, candidate := range p.getContainersToDeleteInPod(exitedContainerID, podStatus) {
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case p.worker <- candidate.ID:
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								glog.Warningf("Failed to issue the request to remove container %v", candidate.ID)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										69
									
								
								pkg/kubelet/pod_container_deletor_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								pkg/kubelet/pod_container_deletor_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,69 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package kubelet
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"reflect"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
				
			||||||
 | 
						containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func testGetContainersToDeleteInPod(t *testing.T) {
 | 
				
			||||||
 | 
						pod := kubecontainer.PodStatus{
 | 
				
			||||||
 | 
							ContainerStatuses: []*kubecontainer.ContainerStatus{
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									ID:        kubecontainer.ContainerID{Type: "test", ID: "1"},
 | 
				
			||||||
 | 
									Name:      "foo",
 | 
				
			||||||
 | 
									CreatedAt: time.Now(),
 | 
				
			||||||
 | 
									State:     kubecontainer.ContainerStateExited,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									ID:        kubecontainer.ContainerID{Type: "test", ID: "2"},
 | 
				
			||||||
 | 
									Name:      "bar",
 | 
				
			||||||
 | 
									CreatedAt: time.Now().Add(time.Second),
 | 
				
			||||||
 | 
									State:     kubecontainer.ContainerStateExited,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									ID:        kubecontainer.ContainerID{Type: "test", ID: "3"},
 | 
				
			||||||
 | 
									Name:      "bar",
 | 
				
			||||||
 | 
									CreatedAt: time.Now().Add(2 * time.Second),
 | 
				
			||||||
 | 
									State:     kubecontainer.ContainerStateExited,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									ID:        kubecontainer.ContainerID{Type: "test", ID: "4"},
 | 
				
			||||||
 | 
									Name:      "bar",
 | 
				
			||||||
 | 
									CreatedAt: time.Now().Add(3 * time.Second),
 | 
				
			||||||
 | 
									State:     kubecontainer.ContainerStateExited,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									ID:        kubecontainer.ContainerID{Type: "test", ID: "5"},
 | 
				
			||||||
 | 
									Name:      "bar",
 | 
				
			||||||
 | 
									CreatedAt: time.Now().Add(4 * time.Second),
 | 
				
			||||||
 | 
									State:     kubecontainer.ContainerStateRunning,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						expectedCandidates := []*kubecontainer.ContainerStatus{pod.ContainerStatuses[2], pod.ContainerStatuses[1]}
 | 
				
			||||||
 | 
						candidates := newPodContainerDeletor(&containertest.FakeRuntime{}, 1).getContainersToDeleteInPod("2", &pod)
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(candidates, expectedCandidates) {
 | 
				
			||||||
 | 
							t.Errorf("expected %v got %v", expectedCandidates, candidates)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user