mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Add reconcile support in kubelet
This commit is contained in:
		@@ -46,7 +46,7 @@ const (
 | 
			
		||||
	// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are
 | 
			
		||||
	// changed, and a SET message if there are any additions or removals.
 | 
			
		||||
	PodConfigNotificationSnapshotAndUpdates
 | 
			
		||||
	// PodConfigNotificationIncremental delivers ADD, UPDATE, and REMOVE to the update channel.
 | 
			
		||||
	// PodConfigNotificationIncremental delivers ADD, UPDATE, REMOVE, RECONCILE to the update channel.
 | 
			
		||||
	PodConfigNotificationIncremental
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -152,7 +152,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
 | 
			
		||||
	defer s.updateLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	seenBefore := s.sourcesSeen.Has(source)
 | 
			
		||||
	adds, updates, deletes := s.merge(source, change)
 | 
			
		||||
	adds, updates, deletes, reconciles := s.merge(source, change)
 | 
			
		||||
	firstSet := !seenBefore && s.sourcesSeen.Has(source)
 | 
			
		||||
 | 
			
		||||
	// deliver update notifications
 | 
			
		||||
@@ -167,6 +167,10 @@ func (s *podStorage) Merge(source string, change interface{}) error {
 | 
			
		||||
		if len(updates.Pods) > 0 {
 | 
			
		||||
			s.updates <- *updates
 | 
			
		||||
		}
 | 
			
		||||
		// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
 | 
			
		||||
		if len(reconciles.Pods) > 0 {
 | 
			
		||||
			s.updates <- *reconciles
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	case PodConfigNotificationSnapshotAndUpdates:
 | 
			
		||||
		if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
 | 
			
		||||
@@ -190,13 +194,14 @@ func (s *podStorage) Merge(source string, change interface{}) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubetypes.PodUpdate) {
 | 
			
		||||
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, reconciles *kubetypes.PodUpdate) {
 | 
			
		||||
	s.podLock.Lock()
 | 
			
		||||
	defer s.podLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	addPods := []*api.Pod{}
 | 
			
		||||
	updatePods := []*api.Pod{}
 | 
			
		||||
	deletePods := []*api.Pod{}
 | 
			
		||||
	reconcilePods := []*api.Pod{}
 | 
			
		||||
 | 
			
		||||
	pods := s.pods[source]
 | 
			
		||||
	if pods == nil {
 | 
			
		||||
@@ -221,12 +226,12 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
 | 
			
		||||
			}
 | 
			
		||||
			ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
 | 
			
		||||
			if existing, found := pods[name]; found {
 | 
			
		||||
				if checkAndUpdatePod(existing, ref) {
 | 
			
		||||
					// this is an update
 | 
			
		||||
				needUpdate, needReconcile := checkAndUpdatePod(existing, ref)
 | 
			
		||||
				if needUpdate {
 | 
			
		||||
					updatePods = append(updatePods, existing)
 | 
			
		||||
					continue
 | 
			
		||||
				} else if needReconcile {
 | 
			
		||||
					reconcilePods = append(reconcilePods, existing)
 | 
			
		||||
				}
 | 
			
		||||
				// this is a no-op
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			// this is an add
 | 
			
		||||
@@ -265,12 +270,12 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
 | 
			
		||||
			ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
 | 
			
		||||
			if existing, found := oldPods[name]; found {
 | 
			
		||||
				pods[name] = existing
 | 
			
		||||
				if checkAndUpdatePod(existing, ref) {
 | 
			
		||||
					// this is an update
 | 
			
		||||
				needUpdate, needReconcile := checkAndUpdatePod(existing, ref)
 | 
			
		||||
				if needUpdate {
 | 
			
		||||
					updatePods = append(updatePods, existing)
 | 
			
		||||
					continue
 | 
			
		||||
				} else if needReconcile {
 | 
			
		||||
					reconcilePods = append(reconcilePods, existing)
 | 
			
		||||
				}
 | 
			
		||||
				// this is a no-op
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			recordFirstSeenTime(ref)
 | 
			
		||||
@@ -295,8 +300,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
 | 
			
		||||
	adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
 | 
			
		||||
	updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
 | 
			
		||||
	deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source}
 | 
			
		||||
	reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
 | 
			
		||||
 | 
			
		||||
	return adds, updates, deletes
 | 
			
		||||
	return adds, updates, deletes, reconciles
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *podStorage) markSourceSet(source string) {
 | 
			
		||||
@@ -416,13 +422,25 @@ func podsDifferSemantically(existing, ref *api.Pod) bool {
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or
 | 
			
		||||
// returns false if there was no update.
 | 
			
		||||
func checkAndUpdatePod(existing, ref *api.Pod) bool {
 | 
			
		||||
// checkAndUpdatePod updates existing, and:
 | 
			
		||||
//   * if ref makes a meaningful change, returns needUpdate=true
 | 
			
		||||
//   * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
 | 
			
		||||
//   * else return both false
 | 
			
		||||
//   Now, needUpdate and needReconcile should never be both true
 | 
			
		||||
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) {
 | 
			
		||||
	// TODO: it would be better to update the whole object and only preserve certain things
 | 
			
		||||
	//       like the source annotation or the UID (to ensure safety)
 | 
			
		||||
	if !podsDifferSemantically(existing, ref) {
 | 
			
		||||
		return false
 | 
			
		||||
		// this is not an update
 | 
			
		||||
		// Only check reconcile when it is not an update, because if the pod is going to
 | 
			
		||||
		// be updated, an extra reconcile is unnecessary
 | 
			
		||||
		if !reflect.DeepEqual(existing.Status, ref.Status) {
 | 
			
		||||
			// Pod with changed pod status needs reconcile, because kubelet should
 | 
			
		||||
			// be the source of truth of pod status.
 | 
			
		||||
			existing.Status = ref.Status
 | 
			
		||||
			needReconcile = true
 | 
			
		||||
		}
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	// this is an update
 | 
			
		||||
 | 
			
		||||
@@ -434,8 +452,10 @@ func checkAndUpdatePod(existing, ref *api.Pod) bool {
 | 
			
		||||
	existing.Labels = ref.Labels
 | 
			
		||||
	existing.DeletionTimestamp = ref.DeletionTimestamp
 | 
			
		||||
	existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
 | 
			
		||||
	existing.Status = ref.Status
 | 
			
		||||
	updateAnnotations(existing, ref)
 | 
			
		||||
	return true
 | 
			
		||||
	needUpdate = true
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Sync sends a copy of the current state through the update channel.
 | 
			
		||||
 
 | 
			
		||||
@@ -17,7 +17,10 @@ limitations under the License.
 | 
			
		||||
package config
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
@@ -105,7 +108,7 @@ func expectPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate, expected ...ku
 | 
			
		||||
		// Compare pods one by one. This is necessary beacuse we don't want to
 | 
			
		||||
		// compare local annotations.
 | 
			
		||||
		for j := range expected[i].Pods {
 | 
			
		||||
			if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) {
 | 
			
		||||
			if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) || !reflect.DeepEqual(expected[i].Pods[j].Status, update.Pods[j].Status) {
 | 
			
		||||
				t.Fatalf("Expected %#v, Got %#v", expected[i].Pods[j], update.Pods[j])
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
@@ -267,6 +270,51 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
 | 
			
		||||
		CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestNewPodAddedSetReconciled(t *testing.T) {
 | 
			
		||||
	// Create and touch new test pods, return the new pods and touched pod. We should create new pod list
 | 
			
		||||
	// before touching to avoid data race.
 | 
			
		||||
	newTestPods := func(touchStatus, touchSpec bool) ([]*api.Pod, *api.Pod) {
 | 
			
		||||
		pods := []*api.Pod{
 | 
			
		||||
			CreateValidPod("changable-pod-0", "new"),
 | 
			
		||||
			CreateValidPod("constant-pod-1", "new"),
 | 
			
		||||
			CreateValidPod("constant-pod-2", "new"),
 | 
			
		||||
		}
 | 
			
		||||
		if touchStatus {
 | 
			
		||||
			pods[0].Status = api.PodStatus{Message: strconv.Itoa(rand.Int())}
 | 
			
		||||
		}
 | 
			
		||||
		if touchSpec {
 | 
			
		||||
			pods[0].Spec.Containers[0].Name = strconv.Itoa(rand.Int())
 | 
			
		||||
		}
 | 
			
		||||
		return pods, pods[0]
 | 
			
		||||
	}
 | 
			
		||||
	for _, op := range []kubetypes.PodOperation{
 | 
			
		||||
		kubetypes.ADD,
 | 
			
		||||
		kubetypes.SET,
 | 
			
		||||
	} {
 | 
			
		||||
		var podWithStatusChange *api.Pod
 | 
			
		||||
		pods, _ := newTestPods(false, false)
 | 
			
		||||
		channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
 | 
			
		||||
 | 
			
		||||
		// Use SET to initialize the config, especially initialize the source set
 | 
			
		||||
		channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
 | 
			
		||||
		expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...))
 | 
			
		||||
 | 
			
		||||
		// If status is not changed, no reconcile should be triggered
 | 
			
		||||
		channel <- CreatePodUpdate(op, TestSource, pods...)
 | 
			
		||||
		expectNoPodUpdate(t, ch)
 | 
			
		||||
 | 
			
		||||
		// If the pod status is changed and not updated, a reconcile should be triggered
 | 
			
		||||
		pods, podWithStatusChange = newTestPods(true, false)
 | 
			
		||||
		channel <- CreatePodUpdate(op, TestSource, pods...)
 | 
			
		||||
		expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange))
 | 
			
		||||
 | 
			
		||||
		// If the pod status is changed, but the pod is also updated, no reconcile should be triggered
 | 
			
		||||
		pods, podWithStatusChange = newTestPods(true, true)
 | 
			
		||||
		channel <- CreatePodUpdate(op, TestSource, pods...)
 | 
			
		||||
		expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestInitialEmptySet(t *testing.T) {
 | 
			
		||||
	for _, test := range []struct {
 | 
			
		||||
		mode PodConfigNotificationMode
 | 
			
		||||
@@ -324,7 +372,7 @@ func TestPodUpdateAnnotations(t *testing.T) {
 | 
			
		||||
	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPodUpdateLables(t *testing.T) {
 | 
			
		||||
func TestPodUpdateLabels(t *testing.T) {
 | 
			
		||||
	channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
 | 
			
		||||
 | 
			
		||||
	pod := CreateValidPod("foo2", "new")
 | 
			
		||||
 
 | 
			
		||||
@@ -133,6 +133,7 @@ type SyncHandler interface {
 | 
			
		||||
	HandlePodAdditions(pods []*api.Pod)
 | 
			
		||||
	HandlePodUpdates(pods []*api.Pod)
 | 
			
		||||
	HandlePodDeletions(pods []*api.Pod)
 | 
			
		||||
	HandlePodReconcile(pods []*api.Pod)
 | 
			
		||||
	HandlePodSyncs(pods []*api.Pod)
 | 
			
		||||
	HandlePodCleanups() error
 | 
			
		||||
}
 | 
			
		||||
@@ -2334,6 +2335,9 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
 | 
			
		||||
		case kubetypes.REMOVE:
 | 
			
		||||
			glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
 | 
			
		||||
			handler.HandlePodDeletions(u.Pods)
 | 
			
		||||
		case kubetypes.RECONCILE:
 | 
			
		||||
			glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
 | 
			
		||||
			handler.HandlePodReconcile(u.Pods)
 | 
			
		||||
		case kubetypes.SET:
 | 
			
		||||
			// TODO: Do we want to support this?
 | 
			
		||||
			glog.Errorf("Kubelet does not support snapshot update")
 | 
			
		||||
@@ -2468,6 +2472,14 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) {
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
		// Update the pod in pod manager, status manager will do periodically reconcile according
 | 
			
		||||
		// to the pod manager.
 | 
			
		||||
		kl.podManager.UpdatePod(pod)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	for _, pod := range pods {
 | 
			
		||||
 
 | 
			
		||||
@@ -17,7 +17,6 @@ limitations under the License.
 | 
			
		||||
package status
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
@@ -105,13 +104,12 @@ func NewManager(kubeClient client.Interface, podManager kubepod.Manager) Manager
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// isStatusEqual returns true if the given pod statuses are equal, false otherwise.
 | 
			
		||||
// This method sorts container statuses so order does not affect equality.
 | 
			
		||||
// This method normalizes the status before comparing so as to make sure that meaningless
 | 
			
		||||
// changes will be ignored.
 | 
			
		||||
func isStatusEqual(oldStatus, status *api.PodStatus) bool {
 | 
			
		||||
	sort.Sort(kubetypes.SortedContainerStatuses(status.ContainerStatuses))
 | 
			
		||||
	sort.Sort(kubetypes.SortedContainerStatuses(oldStatus.ContainerStatuses))
 | 
			
		||||
 | 
			
		||||
	// TODO: More sophisticated equality checking.
 | 
			
		||||
	return reflect.DeepEqual(status, oldStatus)
 | 
			
		||||
	normalizeStatus(oldStatus)
 | 
			
		||||
	normalizeStatus(status)
 | 
			
		||||
	return api.Semantic.DeepEqual(status, oldStatus)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *manager) Start() {
 | 
			
		||||
@@ -329,6 +327,13 @@ func (m *manager) syncBatch() {
 | 
			
		||||
			}
 | 
			
		||||
			if m.needsUpdate(syncedUID, status) {
 | 
			
		||||
				updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
 | 
			
		||||
			} else if m.needsReconcile(uid, status.status) {
 | 
			
		||||
				// Delete the apiStatusVersions here to force an update on the pod status
 | 
			
		||||
				// In most cases the deleted apiStatusVersions here should be filled
 | 
			
		||||
				// soon after the following syncPod() [If the syncPod() sync an update
 | 
			
		||||
				// successfully].
 | 
			
		||||
				delete(m.apiStatusVersions, syncedUID)
 | 
			
		||||
				updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
@@ -392,6 +397,82 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
 | 
			
		||||
	return !ok || latest < status.version
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// needsReconcile compares the given status with the status in the pod manager (which
 | 
			
		||||
// in fact comes from apiserver), returns whether the status needs to be reconciled with
 | 
			
		||||
// the apiserver. Now when pod status is inconsistent between apiserver and kubelet,
 | 
			
		||||
// kubelet should forcibly send an update to reconclie the inconsistence, because kubelet
 | 
			
		||||
// should be the source of truth of pod status.
 | 
			
		||||
// NOTE(random-liu): It's simpler to pass in mirror pod uid and get mirror pod by uid, but
 | 
			
		||||
// now the pod manager only supports getting mirror pod by static pod, so we have to pass
 | 
			
		||||
// static pod uid here.
 | 
			
		||||
// TODO(random-liu): Simplify the logic when mirror pod manager is added.
 | 
			
		||||
func (m *manager) needsReconcile(uid types.UID, status api.PodStatus) bool {
 | 
			
		||||
	// The pod could be a static pod, so we should translate first.
 | 
			
		||||
	pod, ok := m.podManager.GetPodByUID(uid)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		// Although we get uid from pod manager in syncBatch, it still could be deleted before here.
 | 
			
		||||
		glog.V(4).Infof("Pod %q has been deleted, no need to reconcile", format.Pod(pod))
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	// If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us.
 | 
			
		||||
	if kubepod.IsStaticPod(pod) {
 | 
			
		||||
		mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			glog.V(4).Infof("Static pod %q has no corresponding mirror pod, no need to reconcile", format.Pod(pod))
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
		pod = mirrorPod
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if isStatusEqual(&pod.Status, &status) {
 | 
			
		||||
		// If the status from the source is the same with the cached status,
 | 
			
		||||
		// reconcile is not needed. Just return.
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(3).Infof("Pod status is inconsistent with cached status, a reconciliation should be triggered:\n %+v", util.ObjectDiff(pod.Status, status))
 | 
			
		||||
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by
 | 
			
		||||
// apiserver has no nanosecond infromation. However, the timestamp returned by unversioned.Now() contains nanosecond,
 | 
			
		||||
// so when we do comparison between status from apiserver and cached status, isStatusEqual() will always return false.
 | 
			
		||||
// There is related issue #15262 and PR #15263 about this.
 | 
			
		||||
// In fact, the best way to solve this is to do it on api side. However for now, we normalize the status locally in
 | 
			
		||||
// kubelet temporarily.
 | 
			
		||||
// TODO(random-liu): Remove timestamp related logic after apiserver supports nanosecond or makes it consistent.
 | 
			
		||||
func normalizeStatus(status *api.PodStatus) *api.PodStatus {
 | 
			
		||||
	normalizeTimeStamp := func(t *unversioned.Time) {
 | 
			
		||||
		*t = t.Rfc3339Copy()
 | 
			
		||||
	}
 | 
			
		||||
	normalizeContainerState := func(c *api.ContainerState) {
 | 
			
		||||
		if c.Running != nil {
 | 
			
		||||
			normalizeTimeStamp(&c.Running.StartedAt)
 | 
			
		||||
		}
 | 
			
		||||
		if c.Terminated != nil {
 | 
			
		||||
			normalizeTimeStamp(&c.Terminated.StartedAt)
 | 
			
		||||
			normalizeTimeStamp(&c.Terminated.FinishedAt)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if status.StartTime != nil {
 | 
			
		||||
		normalizeTimeStamp(status.StartTime)
 | 
			
		||||
	}
 | 
			
		||||
	for i := range status.Conditions {
 | 
			
		||||
		condition := &status.Conditions[i]
 | 
			
		||||
		normalizeTimeStamp(&condition.LastProbeTime)
 | 
			
		||||
		normalizeTimeStamp(&condition.LastTransitionTime)
 | 
			
		||||
	}
 | 
			
		||||
	for i := range status.ContainerStatuses {
 | 
			
		||||
		cstatus := &status.ContainerStatuses[i]
 | 
			
		||||
		normalizeContainerState(&cstatus.State)
 | 
			
		||||
		normalizeContainerState(&cstatus.LastTerminationState)
 | 
			
		||||
	}
 | 
			
		||||
	// Sort the container statuses, so that the order won't affect the result of comparison
 | 
			
		||||
	sort.Sort(kubetypes.SortedContainerStatuses(status.ContainerStatuses))
 | 
			
		||||
	return status
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// notRunning returns true if every status is terminated or waiting, or the status list
 | 
			
		||||
// is empty.
 | 
			
		||||
func notRunning(statuses []api.ContainerStatus) bool {
 | 
			
		||||
 
 | 
			
		||||
@@ -44,6 +44,24 @@ var testPod *api.Pod = &api.Pod{
 | 
			
		||||
	},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation
 | 
			
		||||
// will be triggered, which will mess up all the old unit test.
 | 
			
		||||
// To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in
 | 
			
		||||
// pod manager the same with cached ones before syncBatch() so as to avoid reconciling.
 | 
			
		||||
func (m *manager) testSyncBatch() {
 | 
			
		||||
	for uid, status := range m.podStatuses {
 | 
			
		||||
		pod, ok := m.podManager.GetPodByUID(uid)
 | 
			
		||||
		if ok {
 | 
			
		||||
			pod.Status = status.status
 | 
			
		||||
		}
 | 
			
		||||
		pod, ok = m.podManager.GetMirrorPodByPod(pod)
 | 
			
		||||
		if ok {
 | 
			
		||||
			pod.Status = status.status
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newTestManager(kubeClient client.Interface) *manager {
 | 
			
		||||
	podManager := kubepod.NewBasicPodManager(kubepod.NewFakeMirrorClient())
 | 
			
		||||
	podManager.AddPod(testPod)
 | 
			
		||||
@@ -209,8 +227,8 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) {
 | 
			
		||||
	if newReadyCondition.LastTransitionTime.IsZero() {
 | 
			
		||||
		t.Errorf("Unexpected: last transition time not set")
 | 
			
		||||
	}
 | 
			
		||||
	if !oldReadyCondition.LastTransitionTime.Before(newReadyCondition.LastTransitionTime) {
 | 
			
		||||
		t.Errorf("Unexpected: new transition time %s, is not after old transition time %s", newReadyCondition.LastTransitionTime, oldReadyCondition.LastTransitionTime)
 | 
			
		||||
	if newReadyCondition.LastTransitionTime.Before(oldReadyCondition.LastTransitionTime) {
 | 
			
		||||
		t.Errorf("Unexpected: new transition time %s, is before old transition time %s", newReadyCondition.LastTransitionTime, oldReadyCondition.LastTransitionTime)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -259,7 +277,7 @@ func TestSyncBatchIgnoresNotFound(t *testing.T) {
 | 
			
		||||
		return true, nil, errors.NewNotFound(api.Resource("pods"), "test-pod")
 | 
			
		||||
	})
 | 
			
		||||
	syncer.SetPodStatus(testPod, getRandomPodStatus())
 | 
			
		||||
	syncer.syncBatch()
 | 
			
		||||
	syncer.testSyncBatch()
 | 
			
		||||
 | 
			
		||||
	verifyActions(t, syncer.kubeClient, []testclient.Action{
 | 
			
		||||
		testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
 | 
			
		||||
@@ -270,7 +288,7 @@ func TestSyncBatch(t *testing.T) {
 | 
			
		||||
	syncer := newTestManager(&testclient.Fake{})
 | 
			
		||||
	syncer.kubeClient = testclient.NewSimpleFake(testPod)
 | 
			
		||||
	syncer.SetPodStatus(testPod, getRandomPodStatus())
 | 
			
		||||
	syncer.syncBatch()
 | 
			
		||||
	syncer.testSyncBatch()
 | 
			
		||||
	verifyActions(t, syncer.kubeClient, []testclient.Action{
 | 
			
		||||
		testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
 | 
			
		||||
		testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
 | 
			
		||||
@@ -288,7 +306,7 @@ func TestSyncBatchChecksMismatchedUID(t *testing.T) {
 | 
			
		||||
	syncer.podManager.AddPod(&differentPod)
 | 
			
		||||
	syncer.kubeClient = testclient.NewSimpleFake(&pod)
 | 
			
		||||
	syncer.SetPodStatus(&differentPod, getRandomPodStatus())
 | 
			
		||||
	syncer.syncBatch()
 | 
			
		||||
	syncer.testSyncBatch()
 | 
			
		||||
	verifyActions(t, syncer.kubeClient, []testclient.Action{
 | 
			
		||||
		testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
 | 
			
		||||
	})
 | 
			
		||||
@@ -324,7 +342,7 @@ func TestSyncBatchNoDeadlock(t *testing.T) {
 | 
			
		||||
	ret = *pod
 | 
			
		||||
	err = errors.NewNotFound(api.Resource("pods"), pod.Name)
 | 
			
		||||
	m.SetPodStatus(pod, getRandomPodStatus())
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{getAction})
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
 | 
			
		||||
@@ -332,21 +350,21 @@ func TestSyncBatchNoDeadlock(t *testing.T) {
 | 
			
		||||
	ret.UID = "other_pod"
 | 
			
		||||
	err = nil
 | 
			
		||||
	m.SetPodStatus(pod, getRandomPodStatus())
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{getAction})
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
 | 
			
		||||
	// Pod not deleted (success case).
 | 
			
		||||
	ret = *pod
 | 
			
		||||
	m.SetPodStatus(pod, getRandomPodStatus())
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{getAction, updateAction})
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
 | 
			
		||||
	// Pod is terminated, but still running.
 | 
			
		||||
	pod.DeletionTimestamp = new(unversioned.Time)
 | 
			
		||||
	m.SetPodStatus(pod, getRandomPodStatus())
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{getAction, updateAction})
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
 | 
			
		||||
@@ -354,14 +372,14 @@ func TestSyncBatchNoDeadlock(t *testing.T) {
 | 
			
		||||
	pod.Status.ContainerStatuses[0].State.Running = nil
 | 
			
		||||
	pod.Status.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{}
 | 
			
		||||
	m.SetPodStatus(pod, getRandomPodStatus())
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{getAction, updateAction})
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
 | 
			
		||||
	// Error case.
 | 
			
		||||
	err = fmt.Errorf("intentional test error")
 | 
			
		||||
	m.SetPodStatus(pod, getRandomPodStatus())
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{getAction})
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
}
 | 
			
		||||
@@ -380,7 +398,7 @@ func TestStaleUpdates(t *testing.T) {
 | 
			
		||||
	verifyUpdates(t, m, 3)
 | 
			
		||||
 | 
			
		||||
	t.Logf("First sync pushes latest status.")
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, m.kubeClient, []testclient.Action{
 | 
			
		||||
		testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
 | 
			
		||||
		testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
 | 
			
		||||
@@ -389,7 +407,7 @@ func TestStaleUpdates(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 2; i++ {
 | 
			
		||||
		t.Logf("Next 2 syncs should be ignored (%d).", i)
 | 
			
		||||
		m.syncBatch()
 | 
			
		||||
		m.testSyncBatch()
 | 
			
		||||
		verifyActions(t, m.kubeClient, []testclient.Action{})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -401,7 +419,7 @@ func TestStaleUpdates(t *testing.T) {
 | 
			
		||||
	m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1
 | 
			
		||||
 | 
			
		||||
	m.SetPodStatus(&pod, status)
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, m.kubeClient, []testclient.Action{
 | 
			
		||||
		testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
 | 
			
		||||
		testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
 | 
			
		||||
@@ -471,7 +489,7 @@ func TestStaticPodStatus(t *testing.T) {
 | 
			
		||||
	retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
 | 
			
		||||
	assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
 | 
			
		||||
	// Should translate mirrorPod / staticPod UID.
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, m.kubeClient, []testclient.Action{
 | 
			
		||||
		testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
 | 
			
		||||
		testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
 | 
			
		||||
@@ -483,7 +501,7 @@ func TestStaticPodStatus(t *testing.T) {
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
 | 
			
		||||
	// No changes.
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, m.kubeClient, []testclient.Action{})
 | 
			
		||||
 | 
			
		||||
	// Mirror pod identity changes.
 | 
			
		||||
@@ -492,7 +510,7 @@ func TestStaticPodStatus(t *testing.T) {
 | 
			
		||||
	mirrorPod.Status = api.PodStatus{}
 | 
			
		||||
	m.podManager.AddPod(&mirrorPod)
 | 
			
		||||
	// Expect update to new mirrorPod.
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	verifyActions(t, m.kubeClient, []testclient.Action{
 | 
			
		||||
		testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
 | 
			
		||||
		testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
 | 
			
		||||
@@ -604,7 +622,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
 | 
			
		||||
	// Orphaned pods should be removed.
 | 
			
		||||
	m.apiStatusVersions[testPod.UID] = 100
 | 
			
		||||
	m.apiStatusVersions[mirrorPod.UID] = 200
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	if _, ok := m.apiStatusVersions[testPod.UID]; ok {
 | 
			
		||||
		t.Errorf("Should have cleared status for testPod")
 | 
			
		||||
	}
 | 
			
		||||
@@ -621,7 +639,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
 | 
			
		||||
	m.podManager.AddPod(&staticPod)
 | 
			
		||||
	m.apiStatusVersions[testPod.UID] = 100
 | 
			
		||||
	m.apiStatusVersions[mirrorPod.UID] = 200
 | 
			
		||||
	m.syncBatch()
 | 
			
		||||
	m.testSyncBatch()
 | 
			
		||||
	if _, ok := m.apiStatusVersions[testPod.UID]; !ok {
 | 
			
		||||
		t.Errorf("Should not have cleared status for testPod")
 | 
			
		||||
	}
 | 
			
		||||
@@ -630,6 +648,61 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestReconcilePodStatus(t *testing.T) {
 | 
			
		||||
	client := testclient.NewSimpleFake(testPod)
 | 
			
		||||
	syncer := newTestManager(client)
 | 
			
		||||
	syncer.SetPodStatus(testPod, getRandomPodStatus())
 | 
			
		||||
	// Call syncBatch directly to test reconcile
 | 
			
		||||
	syncer.syncBatch() // The apiStatusVersions should be set now
 | 
			
		||||
 | 
			
		||||
	originalStatus := testPod.Status
 | 
			
		||||
	podStatus, ok := syncer.GetPodStatus(testPod.UID)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		t.Fatal("Should find pod status for pod: %+v", testPod)
 | 
			
		||||
	}
 | 
			
		||||
	testPod.Status = podStatus
 | 
			
		||||
 | 
			
		||||
	// If the pod status is the same, a reconciliation is not needed,
 | 
			
		||||
	// syncBatch should do nothing
 | 
			
		||||
	syncer.podManager.UpdatePod(testPod)
 | 
			
		||||
	if syncer.needsReconcile(testPod.UID, podStatus) {
 | 
			
		||||
		t.Errorf("Pod status is the same, a reconciliation is not needed")
 | 
			
		||||
	}
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
	syncer.syncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{})
 | 
			
		||||
 | 
			
		||||
	// If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond),
 | 
			
		||||
	// a reconciliation is not needed, syncBatch should do nothing.
 | 
			
		||||
	// The StartTime should have been set in SetPodStatus().
 | 
			
		||||
	// TODO(random-liu): Remove this later when api becomes consistent for timestamp.
 | 
			
		||||
	normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy()
 | 
			
		||||
	testPod.Status.StartTime = &normalizedStartTime
 | 
			
		||||
	syncer.podManager.UpdatePod(testPod)
 | 
			
		||||
	if syncer.needsReconcile(testPod.UID, podStatus) {
 | 
			
		||||
		t.Errorf("Pod status only differs for timestamp format, a reconciliation is not needed")
 | 
			
		||||
	}
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
	syncer.syncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{})
 | 
			
		||||
 | 
			
		||||
	// If the pod status is different, a reconciliation is needed, syncBatch should trigger an update
 | 
			
		||||
	testPod.Status = getRandomPodStatus()
 | 
			
		||||
	syncer.podManager.UpdatePod(testPod)
 | 
			
		||||
	if !syncer.needsReconcile(testPod.UID, podStatus) {
 | 
			
		||||
		t.Errorf("Pod status is different, a reconciliation is needed")
 | 
			
		||||
	}
 | 
			
		||||
	client.ClearActions()
 | 
			
		||||
	syncer.syncBatch()
 | 
			
		||||
	verifyActions(t, client, []testclient.Action{
 | 
			
		||||
		testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
 | 
			
		||||
		testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	// Just in case that testPod is shared among different test functions, set it back.
 | 
			
		||||
	testPod.Status = originalStatus
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func expectPodStatus(t *testing.T, m *manager, pod *api.Pod) api.PodStatus {
 | 
			
		||||
	status, ok := m.GetPodStatus(pod.UID)
 | 
			
		||||
	if !ok {
 | 
			
		||||
 
 | 
			
		||||
@@ -39,6 +39,9 @@ const (
 | 
			
		||||
	REMOVE
 | 
			
		||||
	// Pods with the given ids have been updated in this source
 | 
			
		||||
	UPDATE
 | 
			
		||||
	// Pods with the given ids have unexpected status in this source,
 | 
			
		||||
	// kubelet should reconcile status with this source
 | 
			
		||||
	RECONCILE
 | 
			
		||||
 | 
			
		||||
	// These constants identify the sources of pods
 | 
			
		||||
	// Updates from a file
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user