Kubelet: support retrieving stats using UID of mirror pod

Kubelet supports retrieving stats for pods/containers with and without UID.
This does not always work for the static pods because users may get the UIDs of
the mirror pods from the API server, and use them to query Kubelet. In this
case, Kubelet would fail to locate the containers due to mismatched UIDs.

This change adds a intenral mirror to static pod UID mapping and teaches all
public-facing functions to perform UID lookup before proceeding. This allows
users to use either mirror or static pod's UID to retrieve stats.
This commit is contained in:
Yu-Ju Hong
2015-03-20 13:55:26 -07:00
parent 0250fcfd8f
commit 15e9760bd4
6 changed files with 240 additions and 47 deletions

View File

@@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -585,7 +585,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -694,7 +694,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -764,7 +764,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(2)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -805,7 +805,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
@@ -813,7 +813,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
@@ -852,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -1731,7 +1731,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
}, emptyPodUIDs, util.NewStringSet(), time.Now())
}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -2936,7 +2936,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
@@ -3183,23 +3183,119 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorManager
orphanedPodNames := []string{"pod1_ns", "pod2_ns"}
mirrorPods := util.NewStringSet()
for _, name := range orphanedPodNames {
mirrorPods.Insert(name)
orphanPods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "pod1",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "12345679",
Name: "pod2",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
}
mirrorPods := newMirrorPods()
for _, pod := range orphanPods {
mirrorPods.Insert(&pod)
}
// Sync with an empty pod list to delete all mirror pods.
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now())
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, *mirrorPods, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if manager.NumOfPods() != 0 {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
}
for _, name := range orphanedPodNames {
for _, pod := range orphanPods {
name := GetPodFullName(&pod)
creates, deletes := manager.GetCounts(name)
if creates != 0 || deletes != 1 {
t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)
}
}
}
func TestGetContainerInfoForMirrorPods(t *testing.T) {
// pods contain one static and one mirror pod with the same name but
// different UIDs.
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "1234",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "5678",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
}
containerID := "ab2cdf"
containerPath := fmt.Sprintf("/docker/%v", containerID)
containerInfo := cadvisorApi.ContainerInfo{
ContainerReference: cadvisorApi.ContainerReference{
Name: containerPath,
},
}
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
mockCadvisor := testKubelet.fakeCadvisor
cadvisorReq := &cadvisorApi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)
fakeDocker.ContainerList = []docker.APIContainers{
{
ID: containerID,
Names: []string{"/k8s_foo_qux_ns_1234_42"},
},
}
kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods)
// Use the mirror pod UID to retrieve the stats.
stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if stats == nil {
t.Fatalf("stats should not be nil")
}
mockCadvisor.AssertExpectations(t)
}