mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	kubelet/dockertools: Add puller interfaces in the containerManager.
This commit is contained in:
		@@ -96,8 +96,8 @@ type throttledDockerPuller struct {
 | 
			
		||||
	limiter util.RateLimiter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewDockerPuller creates a new instance of the default implementation of DockerPuller.
 | 
			
		||||
func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
 | 
			
		||||
// newDockerPuller creates a new instance of the default implementation of DockerPuller.
 | 
			
		||||
func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
 | 
			
		||||
	dp := dockerPuller{
 | 
			
		||||
		client:  client,
 | 
			
		||||
		keyring: credentialprovider.NewDockerKeyring(),
 | 
			
		||||
 
 | 
			
		||||
@@ -396,7 +396,7 @@ func TestIsImagePresent(t *testing.T) {
 | 
			
		||||
func TestGetRunningContainers(t *testing.T) {
 | 
			
		||||
	fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
 | 
			
		||||
	fakeRecorder := &record.FakeRecorder{}
 | 
			
		||||
	containerManager := NewDockerManager(fakeDocker, fakeRecorder, PodInfraContainerImage)
 | 
			
		||||
	containerManager := NewDockerManager(fakeDocker, fakeRecorder, PodInfraContainerImage, 0, 0)
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		containers  map[string]*docker.Container
 | 
			
		||||
		inputIDs    []string
 | 
			
		||||
 
 | 
			
		||||
@@ -60,18 +60,25 @@ type DockerManager struct {
 | 
			
		||||
	//      means that some entries may be recycled before a pod has been
 | 
			
		||||
	//      deleted.
 | 
			
		||||
	reasonCache stringCache
 | 
			
		||||
	// TODO(yifan): We export this for testability, so when we have a fake
 | 
			
		||||
	// container manager, then we can unexport this. Also at that time, we
 | 
			
		||||
	// use the concrete type so that we can record the pull failure and eliminate
 | 
			
		||||
	// the image checking in GetPodStatus().
 | 
			
		||||
	Puller DockerPuller
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Ensures DockerManager implements ConatinerRunner.
 | 
			
		||||
var _ kubecontainer.ContainerRunner = new(DockerManager)
 | 
			
		||||
 | 
			
		||||
func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string) *DockerManager {
 | 
			
		||||
func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string, qps float32, burst int) *DockerManager {
 | 
			
		||||
	reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)}
 | 
			
		||||
	return &DockerManager{
 | 
			
		||||
		client:                 client,
 | 
			
		||||
		recorder:               recorder,
 | 
			
		||||
		PodInfraContainerImage: podInfraContainerImage,
 | 
			
		||||
		reasonCache:            reasonCache}
 | 
			
		||||
		reasonCache:            reasonCache,
 | 
			
		||||
		Puller:                 newDockerPuller(client, qps, burst),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// A cache which stores strings keyed by <pod_UID>_<container_name>.
 | 
			
		||||
@@ -569,3 +576,11 @@ func (self *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
 | 
			
		||||
	}
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (self *DockerManager) Pull(image string) error {
 | 
			
		||||
	return self.Puller.Pull(image)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (self *DockerManager) IsImagePresent(image string) (bool, error) {
 | 
			
		||||
	return self.Puller.IsImagePresent(image)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -199,7 +199,7 @@ func NewMainKubelet(
 | 
			
		||||
		return nil, fmt.Errorf("failed to initialize image manager: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	statusManager := newStatusManager(kubeClient)
 | 
			
		||||
	containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage)
 | 
			
		||||
	containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst)
 | 
			
		||||
 | 
			
		||||
	klet := &Kubelet{
 | 
			
		||||
		hostname:               hostname,
 | 
			
		||||
@@ -211,8 +211,6 @@ func NewMainKubelet(
 | 
			
		||||
		readinessManager:       kubecontainer.NewReadinessManager(),
 | 
			
		||||
		runner:                 dockertools.NewDockerContainerCommandRunner(dockerClient),
 | 
			
		||||
		httpClient:             &http.Client{},
 | 
			
		||||
		pullQPS:                pullQPS,
 | 
			
		||||
		pullBurst:              pullBurst,
 | 
			
		||||
		sourcesReady:           sourcesReady,
 | 
			
		||||
		clusterDomain:          clusterDomain,
 | 
			
		||||
		clusterDNS:             clusterDNS,
 | 
			
		||||
@@ -289,18 +287,12 @@ type Kubelet struct {
 | 
			
		||||
	// Tracks references for reporting events
 | 
			
		||||
	containerRefManager *kubecontainer.RefManager
 | 
			
		||||
 | 
			
		||||
	// Optional, defaults to simple Docker implementation
 | 
			
		||||
	dockerPuller dockertools.DockerPuller
 | 
			
		||||
	// Optional, defaults to /logs/ from /var/log
 | 
			
		||||
	logServer http.Handler
 | 
			
		||||
	// Optional, defaults to simple Docker implementation
 | 
			
		||||
	runner dockertools.ContainerCommandRunner
 | 
			
		||||
	// Optional, client for http requests, defaults to empty client
 | 
			
		||||
	httpClient httpGetter
 | 
			
		||||
	// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
 | 
			
		||||
	pullQPS float32
 | 
			
		||||
	// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
 | 
			
		||||
	pullBurst int
 | 
			
		||||
 | 
			
		||||
	// cAdvisor used for container information.
 | 
			
		||||
	cadvisor cadvisor.Interface
 | 
			
		||||
@@ -541,9 +533,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
 | 
			
		||||
	if kl.logServer == nil {
 | 
			
		||||
		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
 | 
			
		||||
	}
 | 
			
		||||
	if kl.dockerPuller == nil {
 | 
			
		||||
		kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
 | 
			
		||||
	}
 | 
			
		||||
	if kl.kubeClient == nil {
 | 
			
		||||
		glog.Warning("No api server defined - no node status update will be sent.")
 | 
			
		||||
	}
 | 
			
		||||
@@ -877,7 +866,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.Pod) (dockertools.DockerID,
 | 
			
		||||
		glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
	// TODO: make this a TTL based pull (if image older than X policy, pull)
 | 
			
		||||
	ok, err := kl.dockerPuller.IsImagePresent(container.Image)
 | 
			
		||||
	ok, err := kl.containerManager.IsImagePresent(container.Image)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if ref != nil {
 | 
			
		||||
			kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
 | 
			
		||||
@@ -919,7 +908,7 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
 | 
			
		||||
		metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	if err := kl.dockerPuller.Pull(img); err != nil {
 | 
			
		||||
	if err := kl.containerManager.Pull(img); err != nil {
 | 
			
		||||
		if ref != nil {
 | 
			
		||||
			kl.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", img, err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -1033,7 +1022,7 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain
 | 
			
		||||
		glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
 | 
			
		||||
	}
 | 
			
		||||
	if container.ImagePullPolicy != api.PullNever {
 | 
			
		||||
		present, err := kl.dockerPuller.IsImagePresent(container.Image)
 | 
			
		||||
		present, err := kl.containerManager.IsImagePresent(container.Image)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if ref != nil {
 | 
			
		||||
				kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
 | 
			
		||||
 
 | 
			
		||||
@@ -73,11 +73,10 @@ func newTestKubelet(t *testing.T) *TestKubelet {
 | 
			
		||||
 | 
			
		||||
	fakeRecorder := &record.FakeRecorder{}
 | 
			
		||||
	fakeKubeClient := &testclient.Fake{}
 | 
			
		||||
 | 
			
		||||
	kubelet := &Kubelet{}
 | 
			
		||||
	kubelet.dockerClient = fakeDocker
 | 
			
		||||
	kubelet.kubeClient = fakeKubeClient
 | 
			
		||||
	kubelet.dockerPuller = &dockertools.FakeDockerPuller{}
 | 
			
		||||
 | 
			
		||||
	kubelet.hostname = "testnode"
 | 
			
		||||
	kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
 | 
			
		||||
	if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
 | 
			
		||||
@@ -104,7 +103,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
 | 
			
		||||
	podManager, fakeMirrorClient := newFakePodManager()
 | 
			
		||||
	kubelet.podManager = podManager
 | 
			
		||||
	kubelet.containerRefManager = kubecontainer.NewRefManager()
 | 
			
		||||
	kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage)
 | 
			
		||||
	kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)
 | 
			
		||||
	kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager)
 | 
			
		||||
	kubelet.podWorkers = newPodWorkers(
 | 
			
		||||
		kubelet.dockerCache,
 | 
			
		||||
@@ -114,6 +113,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
 | 
			
		||||
			return err
 | 
			
		||||
		},
 | 
			
		||||
		fakeRecorder)
 | 
			
		||||
	kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
 | 
			
		||||
	return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -593,7 +593,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
 | 
			
		||||
	kubelet := testKubelet.kubelet
 | 
			
		||||
	fakeDocker := testKubelet.fakeDocker
 | 
			
		||||
	waitGroup := testKubelet.waitGroup
 | 
			
		||||
	puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
 | 
			
		||||
	puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller)
 | 
			
		||||
	puller.HasImages = []string{}
 | 
			
		||||
	kubelet.containerManager.PodInfraContainerImage = "custom_image_name"
 | 
			
		||||
	fakeDocker.ContainerList = []docker.APIContainers{}
 | 
			
		||||
@@ -1249,7 +1249,6 @@ func TestGetRootInfo(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	kubelet := Kubelet{
 | 
			
		||||
		dockerClient: &fakeDocker,
 | 
			
		||||
		dockerPuller: &dockertools.FakeDockerPuller{},
 | 
			
		||||
		cadvisor:     mockCadvisor,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -1652,7 +1651,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
 | 
			
		||||
	kubelet := testKubelet.kubelet
 | 
			
		||||
	fakeDocker := testKubelet.fakeDocker
 | 
			
		||||
	waitGroup := testKubelet.waitGroup
 | 
			
		||||
	puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
 | 
			
		||||
	puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller)
 | 
			
		||||
	puller.HasImages = []string{"existing_one", "want:latest"}
 | 
			
		||||
	kubelet.containerManager.PodInfraContainerImage = "custom_image_name"
 | 
			
		||||
	fakeDocker.ContainerList = []docker.APIContainers{}
 | 
			
		||||
 
 | 
			
		||||
@@ -40,7 +40,7 @@ func newPod(uid, name string) *api.Pod {
 | 
			
		||||
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
 | 
			
		||||
	fakeDocker := &dockertools.FakeDockerClient{}
 | 
			
		||||
	fakeRecorder := &record.FakeRecorder{}
 | 
			
		||||
	fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage))
 | 
			
		||||
	fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0))
 | 
			
		||||
 | 
			
		||||
	lock := sync.Mutex{}
 | 
			
		||||
	processed := make(map[types.UID][]string)
 | 
			
		||||
 
 | 
			
		||||
@@ -22,7 +22,6 @@ import (
 | 
			
		||||
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -53,9 +52,6 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) {
 | 
			
		||||
 | 
			
		||||
// runOnce runs a given set of pods and returns their status.
 | 
			
		||||
func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
 | 
			
		||||
	if kl.dockerPuller == nil {
 | 
			
		||||
		kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
 | 
			
		||||
	}
 | 
			
		||||
	kl.handleNotFittingPods(pods)
 | 
			
		||||
 | 
			
		||||
	ch := make(chan RunPodResult)
 | 
			
		||||
 
 | 
			
		||||
@@ -144,8 +144,9 @@ func TestRunOnce(t *testing.T) {
 | 
			
		||||
		},
 | 
			
		||||
		t: t,
 | 
			
		||||
	}
 | 
			
		||||
	kb.dockerPuller = &dockertools.FakeDockerPuller{}
 | 
			
		||||
	kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage)
 | 
			
		||||
 | 
			
		||||
	kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage, 0, 0)
 | 
			
		||||
	kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
 | 
			
		||||
 | 
			
		||||
	pods := []api.Pod{
 | 
			
		||||
		{
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user