mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Revert "Periodically update pod status from kubelet."
This commit is contained in:
		@@ -657,7 +657,7 @@ func runServiceTest(client *client.Client) {
 | 
				
			|||||||
		glog.Fatalf("Failed to create service: %v, %v", svc1, err)
 | 
							glog.Fatalf("Failed to create service: %v, %v", svc1, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// create an identical service in the non-default namespace
 | 
						// create an identical service in the default namespace
 | 
				
			||||||
	svc3 := &api.Service{
 | 
						svc3 := &api.Service{
 | 
				
			||||||
		ObjectMeta: api.ObjectMeta{Name: "service1"},
 | 
							ObjectMeta: api.ObjectMeta{Name: "service1"},
 | 
				
			||||||
		Spec: api.ServiceSpec{
 | 
							Spec: api.ServiceSpec{
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -691,7 +691,6 @@ func ValidatePodStatusUpdate(newPod, oldPod *api.Pod) errs.ValidationErrorList {
 | 
				
			|||||||
		allErrs = append(allErrs, errs.NewFieldInvalid("status.host", newPod.Status.Host, "pod host cannot be changed directly"))
 | 
							allErrs = append(allErrs, errs.NewFieldInvalid("status.host", newPod.Status.Host, "pod host cannot be changed directly"))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// For status update we ignore changes to pod spec.
 | 
					 | 
				
			||||||
	newPod.Spec = oldPod.Spec
 | 
						newPod.Spec = oldPod.Spec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return allErrs
 | 
						return allErrs
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -63,8 +63,3 @@ func (c *FakePods) Bind(bind *api.Binding) error {
 | 
				
			|||||||
	c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name})
 | 
						c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name})
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
func (c *FakePods) UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error) {
 | 
					 | 
				
			||||||
	c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-status-pod", Value: name})
 | 
					 | 
				
			||||||
	return &api.Pod{}, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
@@ -39,7 +39,6 @@ type PodInterface interface {
 | 
				
			|||||||
	Update(pod *api.Pod) (*api.Pod, error)
 | 
						Update(pod *api.Pod) (*api.Pod, error)
 | 
				
			||||||
	Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
 | 
						Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
 | 
				
			||||||
	Bind(binding *api.Binding) error
 | 
						Bind(binding *api.Binding) error
 | 
				
			||||||
	UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// pods implements PodsNamespacer interface
 | 
					// pods implements PodsNamespacer interface
 | 
				
			||||||
@@ -63,7 +62,7 @@ func (c *pods) List(selector labels.Selector) (result *api.PodList, err error) {
 | 
				
			|||||||
	return
 | 
						return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Get takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
 | 
					// GetPod takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
 | 
				
			||||||
func (c *pods) Get(name string) (result *api.Pod, err error) {
 | 
					func (c *pods) Get(name string) (result *api.Pod, err error) {
 | 
				
			||||||
	if len(name) == 0 {
 | 
						if len(name) == 0 {
 | 
				
			||||||
		return nil, errors.New("name is required parameter to Get")
 | 
							return nil, errors.New("name is required parameter to Get")
 | 
				
			||||||
@@ -74,19 +73,19 @@ func (c *pods) Get(name string) (result *api.Pod, err error) {
 | 
				
			|||||||
	return
 | 
						return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Delete takes the name of the pod, and returns an error if one occurs
 | 
					// DeletePod takes the name of the pod, and returns an error if one occurs
 | 
				
			||||||
func (c *pods) Delete(name string) error {
 | 
					func (c *pods) Delete(name string) error {
 | 
				
			||||||
	return c.r.Delete().Namespace(c.ns).Resource("pods").Name(name).Do().Error()
 | 
						return c.r.Delete().Namespace(c.ns).Resource("pods").Name(name).Do().Error()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Create takes the representation of a pod.  Returns the server's representation of the pod, and an error, if it occurs.
 | 
					// CreatePod takes the representation of a pod.  Returns the server's representation of the pod, and an error, if it occurs.
 | 
				
			||||||
func (c *pods) Create(pod *api.Pod) (result *api.Pod, err error) {
 | 
					func (c *pods) Create(pod *api.Pod) (result *api.Pod, err error) {
 | 
				
			||||||
	result = &api.Pod{}
 | 
						result = &api.Pod{}
 | 
				
			||||||
	err = c.r.Post().Namespace(c.ns).Resource("pods").Body(pod).Do().Into(result)
 | 
						err = c.r.Post().Namespace(c.ns).Resource("pods").Body(pod).Do().Into(result)
 | 
				
			||||||
	return
 | 
						return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Update takes the representation of a pod to update.  Returns the server's representation of the pod, and an error, if it occurs.
 | 
					// UpdatePod takes the representation of a pod to update.  Returns the server's representation of the pod, and an error, if it occurs.
 | 
				
			||||||
func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) {
 | 
					func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) {
 | 
				
			||||||
	result = &api.Pod{}
 | 
						result = &api.Pod{}
 | 
				
			||||||
	if len(pod.ResourceVersion) == 0 {
 | 
						if len(pod.ResourceVersion) == 0 {
 | 
				
			||||||
@@ -113,15 +112,3 @@ func (c *pods) Watch(label, field labels.Selector, resourceVersion string) (watc
 | 
				
			|||||||
func (c *pods) Bind(binding *api.Binding) error {
 | 
					func (c *pods) Bind(binding *api.Binding) error {
 | 
				
			||||||
	return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error()
 | 
						return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// UpdateStatus takes the name of the pod and the new status.  Returns the server's representation of the pod, and an error, if it occurs.
 | 
					 | 
				
			||||||
func (c *pods) UpdateStatus(name string, newStatus *api.PodStatus) (result *api.Pod, err error) {
 | 
					 | 
				
			||||||
	result = &api.Pod{}
 | 
					 | 
				
			||||||
	pod, err := c.Get(name)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	pod.Status = *newStatus
 | 
					 | 
				
			||||||
	err = c.r.Put().Namespace(c.ns).Resource("pods").Name(pod.Name).SubResource("status").Body(pod).Do().Into(result)
 | 
					 | 
				
			||||||
	return
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
@@ -31,7 +31,7 @@ func NewSourceApiserver(client *client.Client, hostname string, updates chan<- i
 | 
				
			|||||||
	newSourceApiserverFromLW(lw, updates)
 | 
						newSourceApiserverFromLW(lw, updates)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
 | 
					// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver.
 | 
				
			||||||
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
 | 
					func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
 | 
				
			||||||
	send := func(objs []interface{}) {
 | 
						send := func(objs []interface{}) {
 | 
				
			||||||
		var pods []api.Pod
 | 
							var pods []api.Pod
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -126,7 +126,6 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf
 | 
				
			|||||||
			Running: true,
 | 
								Running: true,
 | 
				
			||||||
			Pid:     42,
 | 
								Pid:     42,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		NetworkSettings: &docker.NetworkSettings{IPAddress: "1.2.3.4"},
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return f.Err
 | 
						return f.Err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -285,7 +285,7 @@ type Kubelet struct {
 | 
				
			|||||||
	// the EventRecorder to use
 | 
						// the EventRecorder to use
 | 
				
			||||||
	recorder record.EventRecorder
 | 
						recorder record.EventRecorder
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// A pod status cache stores statuses for pods (both rejected and synced).
 | 
						// A pod status cache currently used to store rejected pods and their statuses.
 | 
				
			||||||
	podStatusesLock sync.RWMutex
 | 
						podStatusesLock sync.RWMutex
 | 
				
			||||||
	podStatuses     map[string]api.PodStatus
 | 
						podStatuses     map[string]api.PodStatus
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -568,7 +568,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
 | 
				
			|||||||
		glog.Warning("No api server defined - no node status update will be sent.")
 | 
							glog.Warning("No api server defined - no node status update will be sent.")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	go kl.syncNodeStatus()
 | 
						go kl.syncNodeStatus()
 | 
				
			||||||
	go util.Forever(kl.syncStatus, kl.resyncInterval)
 | 
					 | 
				
			||||||
	kl.syncLoop(updates, kl)
 | 
						kl.syncLoop(updates, kl)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1347,17 +1346,6 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dock
 | 
				
			|||||||
func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerContainers) error {
 | 
					func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerContainers) error {
 | 
				
			||||||
	podFullName := GetPodFullName(pod)
 | 
						podFullName := GetPodFullName(pod)
 | 
				
			||||||
	uid := pod.UID
 | 
						uid := pod.UID
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Before returning, regenerate status and store it in the cache.
 | 
					 | 
				
			||||||
	defer func() {
 | 
					 | 
				
			||||||
		status, err := kl.generatePodStatus(podFullName, uid)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			kl.setPodStatusInCache(podFullName, status)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	containerChanges, err := kl.computePodContainerChanges(pod, containersInPod)
 | 
						containerChanges, err := kl.computePodContainerChanges(pod, containersInPod)
 | 
				
			||||||
	glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
 | 
						glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -1727,40 +1715,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// syncStatus syncs pods statuses with the apiserver.
 | 
					 | 
				
			||||||
func (kl *Kubelet) syncStatus() {
 | 
					 | 
				
			||||||
	glog.V(3).Infof("Syncing pods status")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	statuses := make(map[string]api.PodStatus)
 | 
					 | 
				
			||||||
	func() {
 | 
					 | 
				
			||||||
		kl.podLock.Lock()
 | 
					 | 
				
			||||||
		defer kl.podLock.Unlock()
 | 
					 | 
				
			||||||
		for _, pod := range kl.pods {
 | 
					 | 
				
			||||||
			source := pod.Annotations[ConfigSourceAnnotationKey]
 | 
					 | 
				
			||||||
			if source != ApiserverSource {
 | 
					 | 
				
			||||||
				glog.V(3).Infof("Pod status for %q is not updated due to its source %s", pod.Name, source)
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err)
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			statuses[GetPodFullName(&pod)] = status
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for podFullName, status := range statuses {
 | 
					 | 
				
			||||||
		name, namespace := ParsePodFullName(podFullName)
 | 
					 | 
				
			||||||
		pod, err := kl.kubeClient.Pods(namespace).UpdateStatus(name, &status)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", name, err, pod)
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			glog.V(3).Infof("Status for pod %q updated successfully: %s", name, pod)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Updated the Kubelet's internal pods with those provided by the update.
 | 
					// Updated the Kubelet's internal pods with those provided by the update.
 | 
				
			||||||
// Records new and updated pods in newPods and updatedPods.
 | 
					// Records new and updated pods in newPods and updatedPods.
 | 
				
			||||||
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
 | 
					func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
 | 
				
			||||||
@@ -2046,23 +2000,19 @@ func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// GetPodStatus returns information from Docker about the containers in a pod
 | 
					// GetPodStatus returns information from Docker about the containers in a pod
 | 
				
			||||||
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
 | 
					func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
 | 
				
			||||||
	// Check to see if we have a cached version of the status.
 | 
					 | 
				
			||||||
	cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
 | 
					 | 
				
			||||||
	if found {
 | 
					 | 
				
			||||||
		glog.V(3).Infof("Returning cached status for %s", podFullName)
 | 
					 | 
				
			||||||
		return cachedPodStatus, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return kl.generatePodStatus(podFullName, uid)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
 | 
					 | 
				
			||||||
	glog.V(3).Infof("Generating status for %s", podFullName)
 | 
					 | 
				
			||||||
	var podStatus api.PodStatus
 | 
						var podStatus api.PodStatus
 | 
				
			||||||
	spec, found := kl.GetPodByFullName(podFullName)
 | 
						spec, found := kl.GetPodByFullName(podFullName)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !found {
 | 
						if !found {
 | 
				
			||||||
		return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
 | 
							return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Check to see if the pod has been rejected.
 | 
				
			||||||
 | 
						mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName)
 | 
				
			||||||
 | 
						if ok {
 | 
				
			||||||
 | 
							return mappedPodStatus, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)
 | 
						info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -2092,7 +2042,6 @@ func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.Pod
 | 
				
			|||||||
	if found {
 | 
						if found {
 | 
				
			||||||
		podStatus.PodIP = netContainerInfo.PodIP
 | 
							podStatus.PodIP = netContainerInfo.PodIP
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	podStatus.Host = kl.hostname
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return podStatus, nil
 | 
						return podStatus, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("unexpected error: %v", err)
 | 
							t.Errorf("unexpected error: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	waitGroup.Wait()
 | 
						waitGroup.Wait()
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container"})
 | 
						verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestSyncPodsWithTerminationLog(t *testing.T) {
 | 
					func TestSyncPodsWithTerminationLog(t *testing.T) {
 | 
				
			||||||
@@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	waitGroup.Wait()
 | 
						waitGroup.Wait()
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{
 | 
						verifyCalls(t, fakeDocker, []string{
 | 
				
			||||||
		"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
 | 
							"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fakeDocker.Lock()
 | 
						fakeDocker.Lock()
 | 
				
			||||||
	parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
 | 
						parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
 | 
				
			||||||
@@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
 | 
				
			|||||||
	waitGroup.Wait()
 | 
						waitGroup.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{
 | 
						verifyCalls(t, fakeDocker, []string{
 | 
				
			||||||
		"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
 | 
							"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fakeDocker.Lock()
 | 
						fakeDocker.Lock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -584,7 +584,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
 | 
				
			|||||||
	waitGroup.Wait()
 | 
						waitGroup.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{
 | 
						verifyCalls(t, fakeDocker, []string{
 | 
				
			||||||
		"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
 | 
							"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fakeDocker.Lock()
 | 
						fakeDocker.Lock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -634,7 +634,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
 | 
				
			|||||||
	waitGroup.Wait()
 | 
						waitGroup.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{
 | 
						verifyCalls(t, fakeDocker, []string{
 | 
				
			||||||
		"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"})
 | 
							"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fakeDocker.Lock()
 | 
						fakeDocker.Lock()
 | 
				
			||||||
	if len(fakeDocker.Created) != 1 ||
 | 
						if len(fakeDocker.Created) != 1 ||
 | 
				
			||||||
@@ -691,7 +691,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
 | 
				
			|||||||
	waitGroup.Wait()
 | 
						waitGroup.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{
 | 
						verifyCalls(t, fakeDocker, []string{
 | 
				
			||||||
		"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"})
 | 
							"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fakeDocker.Lock()
 | 
						fakeDocker.Lock()
 | 
				
			||||||
	if len(fakeDocker.Created) != 1 ||
 | 
						if len(fakeDocker.Created) != 1 ||
 | 
				
			||||||
@@ -760,7 +760,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
 | 
				
			|||||||
	waitGroup.Wait()
 | 
						waitGroup.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{
 | 
						verifyCalls(t, fakeDocker, []string{
 | 
				
			||||||
		"list", "list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
 | 
							"list", "list", "list", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// A map iteration is used to delete containers, so must not depend on
 | 
						// A map iteration is used to delete containers, so must not depend on
 | 
				
			||||||
	// order here.
 | 
						// order here.
 | 
				
			||||||
@@ -898,7 +898,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("unexpected error: %v", err)
 | 
							t.Errorf("unexpected error: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{"list", "stop", "list"})
 | 
						verifyCalls(t, fakeDocker, []string{"list", "stop"})
 | 
				
			||||||
	// Expect one of the duplicates to be killed.
 | 
						// Expect one of the duplicates to be killed.
 | 
				
			||||||
	if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
 | 
						if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
 | 
				
			||||||
		t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
 | 
							t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
 | 
				
			||||||
@@ -940,7 +940,7 @@ func TestSyncPodBadHash(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	//verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"})
 | 
						//verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"})
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
 | 
						verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// A map interation is used to delete containers, so must not depend on
 | 
						// A map interation is used to delete containers, so must not depend on
 | 
				
			||||||
	// order here.
 | 
						// order here.
 | 
				
			||||||
@@ -993,7 +993,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("unexpected error: %v", err)
 | 
							t.Errorf("unexpected error: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list", "inspect_container"})
 | 
						verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// A map interation is used to delete containers, so must not depend on
 | 
						// A map interation is used to delete containers, so must not depend on
 | 
				
			||||||
	// order here.
 | 
						// order here.
 | 
				
			||||||
@@ -1683,7 +1683,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("unexpected error: %v", err)
 | 
							t.Errorf("unexpected error: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop", "list"})
 | 
						verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(fakeDocker.Stopped) != 1 {
 | 
						if len(fakeDocker.Stopped) != 1 {
 | 
				
			||||||
		t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
 | 
							t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -31,7 +31,7 @@ import (
 | 
				
			|||||||
type syncPodFnType func(*api.Pod, dockertools.DockerContainers) error
 | 
					type syncPodFnType func(*api.Pod, dockertools.DockerContainers) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type podWorkers struct {
 | 
					type podWorkers struct {
 | 
				
			||||||
	// Protects all per worker fields.
 | 
						// Protects podUpdates field.
 | 
				
			||||||
	podLock sync.Mutex
 | 
						podLock sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Tracks all running per-pod goroutines - per-pod goroutine will be
 | 
						// Tracks all running per-pod goroutines - per-pod goroutine will be
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,7 +18,6 @@ package kubelet
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
						"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -69,19 +68,12 @@ type PodUpdate struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetPodFullName returns a name that uniquely identifies a pod across all config sources.
 | 
					// GetPodFullName returns a name that uniquely identifies a pod across all config sources.
 | 
				
			||||||
// NOTE: If changed ParsePodFullName must be also updated.
 | 
					 | 
				
			||||||
func GetPodFullName(pod *api.Pod) string {
 | 
					func GetPodFullName(pod *api.Pod) string {
 | 
				
			||||||
	// Use underscore as the delimiter because it is not allowed in pod name
 | 
						// Use underscore as the delimiter because it is not allowed in pod name
 | 
				
			||||||
	// (DNS subdomain format), while allowed in the container name format.
 | 
						// (DNS subdomain format), while allowed in the container name format.
 | 
				
			||||||
	return fmt.Sprintf("%s_%s", pod.Name, pod.Namespace)
 | 
						return fmt.Sprintf("%s_%s", pod.Name, pod.Namespace)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ParsePodFullName parses full name generated by GetPodFullName and returns parts of it.
 | 
					 | 
				
			||||||
func ParsePodFullName(podFullName string) (name, namespace string) {
 | 
					 | 
				
			||||||
	nameParts := strings.Split(podFullName, "_")
 | 
					 | 
				
			||||||
	return nameParts[0], nameParts[1]
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Build the pod full name from pod name and namespace.
 | 
					// Build the pod full name from pod name and namespace.
 | 
				
			||||||
func BuildPodFullName(name, namespace string) string {
 | 
					func BuildPodFullName(name, namespace string) string {
 | 
				
			||||||
	return name + "_" + namespace
 | 
						return name + "_" + namespace
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -371,19 +371,20 @@ func (m *Master) init(c *Config) {
 | 
				
			|||||||
	m.nodeRegistry = registry
 | 
						m.nodeRegistry = registry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	nodeStorage := minion.NewREST(m.nodeRegistry)
 | 
						nodeStorage := minion.NewREST(m.nodeRegistry)
 | 
				
			||||||
	if c.SyncPodStatus {
 | 
					 | 
				
			||||||
	// TODO: unify the storage -> registry and storage -> client patterns
 | 
						// TODO: unify the storage -> registry and storage -> client patterns
 | 
				
			||||||
	nodeStorageClient := RESTStorageToNodes(nodeStorage)
 | 
						nodeStorageClient := RESTStorageToNodes(nodeStorage)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	podCache := NewPodCache(
 | 
						podCache := NewPodCache(
 | 
				
			||||||
		c.KubeletClient,
 | 
							c.KubeletClient,
 | 
				
			||||||
		nodeStorageClient.Nodes(),
 | 
							nodeStorageClient.Nodes(),
 | 
				
			||||||
		podRegistry,
 | 
							podRegistry,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
						if c.SyncPodStatus {
 | 
				
			||||||
		go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout)
 | 
							go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout)
 | 
				
			||||||
		go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)
 | 
					 | 
				
			||||||
		podStorage = podStorage.WithPodStatus(podCache)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// TODO: refactor podCache to sit on top of podStorage via status calls
 | 
				
			||||||
 | 
						podStorage = podStorage.WithPodStatus(podCache)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// TODO: Factor out the core API registration
 | 
						// TODO: Factor out the core API registration
 | 
				
			||||||
	m.storage = map[string]apiserver.RESTStorage{
 | 
						m.storage = map[string]apiserver.RESTStorage{
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user