diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 025bf9d5ace..17fcf98f9a7 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -771,6 +771,94 @@ func (s ActivePods) Less(i, j int) bool { return false } +// ActivePodsWithRanks is a sortable list of pods and a list of corresponding +// ranks which will be considered during sorting. The two lists must have equal +// length. After sorting, the pods will be ordered as follows, applying each +// rule in turn until one matches: +// +// 1. If only one of the pods is assigned to a node, the pod that is not +// assigned comes before the pod that is. +// 2. If the pods' phases differ, a pending pod comes before a pod whose phase +// is unknown, and a pod whose phase is unknown comes before a running pod. +// 3. If exactly one of the pods is ready, the pod that is not ready comes +// before the ready pod. +// 4. If the pods' ranks differ, the pod with greater rank comes before the pod +// with lower rank. +// 5. If both pods are ready but have not been ready for the same amount of +// time, the pod that has been ready for a shorter amount of time comes +// before the pod that has been ready for longer. +// 6. If one pod has a container that has restarted more than any container in +// the other pod, the pod with the container with more restarts comes +// before the other pod. +// 7. If the pods' creation times differ, the pod that was created more recently +// comes before the older pod. +// +// If none of these rules matches, the second pod comes before the first pod. +// +// The intention of this ordering is to put pods that should be preferred for +// deletion first in the list. +type ActivePodsWithRanks struct { + // Pods is a list of pods. + Pods []*v1.Pod + + // Rank is a ranking of pods. This ranking is used during sorting when + // comparing two pods that are both scheduled, in the same phase, and + // having the same ready status. + Rank []int +} + +func (s ActivePodsWithRanks) Len() int { + return len(s.Pods) +} + +func (s ActivePodsWithRanks) Swap(i, j int) { + s.Pods[i], s.Pods[j] = s.Pods[j], s.Pods[i] + s.Rank[i], s.Rank[j] = s.Rank[j], s.Rank[i] +} + +// Less compares two pods with corresponding ranks and returns true if the first +// one should be preferred for deletion. +func (s ActivePodsWithRanks) Less(i, j int) bool { + // 1. Unassigned < assigned + // If only one of the pods is unassigned, the unassigned one is smaller + if s.Pods[i].Spec.NodeName != s.Pods[j].Spec.NodeName && (len(s.Pods[i].Spec.NodeName) == 0 || len(s.Pods[j].Spec.NodeName) == 0) { + return len(s.Pods[i].Spec.NodeName) == 0 + } + // 2. PodPending < PodUnknown < PodRunning + m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2} + if m[s.Pods[i].Status.Phase] != m[s.Pods[j].Status.Phase] { + return m[s.Pods[i].Status.Phase] < m[s.Pods[j].Status.Phase] + } + // 3. Not ready < ready + // If only one of the pods is not ready, the not ready one is smaller + if podutil.IsPodReady(s.Pods[i]) != podutil.IsPodReady(s.Pods[j]) { + return !podutil.IsPodReady(s.Pods[i]) + } + // 4. Doubled up < not doubled up + // If one of the two pods is on the same node as one or more additional + // ready pods that belong to the same replicaset, whichever pod has more + // colocated ready pods is less + if s.Rank[i] != s.Rank[j] { + return s.Rank[i] > s.Rank[j] + } + // TODO: take availability into account when we push minReadySeconds information from deployment into pods, + // see https://github.com/kubernetes/kubernetes/issues/22065 + // 5. Been ready for empty time < less time < more time + // If both pods are ready, the latest ready one is smaller + if podutil.IsPodReady(s.Pods[i]) && podutil.IsPodReady(s.Pods[j]) && !podReadyTime(s.Pods[i]).Equal(podReadyTime(s.Pods[j])) { + return afterOrZero(podReadyTime(s.Pods[i]), podReadyTime(s.Pods[j])) + } + // 6. Pods with containers with higher restart counts < lower restart counts + if maxContainerRestarts(s.Pods[i]) != maxContainerRestarts(s.Pods[j]) { + return maxContainerRestarts(s.Pods[i]) > maxContainerRestarts(s.Pods[j]) + } + // 7. Empty creation time pods < newer pods < older pods + if !s.Pods[i].CreationTimestamp.Equal(&s.Pods[j].CreationTimestamp) { + return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp) + } + return false +} + // afterOrZero checks if time t1 is after time t2; if one of them // is zero, the zero time is seen as after non-zero time. func afterOrZero(t1, t2 *metav1.Time) bool { diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 3bdda84a662..837a10b22a8 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -429,6 +429,96 @@ func TestSortingActivePods(t *testing.T) { } } +func TestSortingActivePodsWithRanks(t *testing.T) { + now := metav1.Now() + then := metav1.Time{Time: now.AddDate(0, -1, 0)} + zeroTime := metav1.Time{} + pod := func(podName, nodeName string, phase v1.PodPhase, ready bool, restarts int32, readySince metav1.Time, created metav1.Time) *v1.Pod { + var conditions []v1.PodCondition + var containerStatuses []v1.ContainerStatus + if ready { + conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: readySince}} + containerStatuses = []v1.ContainerStatus{{RestartCount: restarts}} + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: created, + Name: podName, + }, + Spec: v1.PodSpec{NodeName: nodeName}, + Status: v1.PodStatus{ + Conditions: conditions, + ContainerStatuses: containerStatuses, + Phase: phase, + }, + } + } + var ( + unscheduledPod = pod("unscheduled", "", v1.PodPending, false, 0, zeroTime, zeroTime) + scheduledPendingPod = pod("pending", "node", v1.PodPending, false, 0, zeroTime, zeroTime) + unknownPhasePod = pod("unknown-phase", "node", v1.PodUnknown, false, 0, zeroTime, zeroTime) + runningNotReadyPod = pod("not-ready", "node", v1.PodRunning, false, 0, zeroTime, zeroTime) + runningReadyNoLastTransitionTimePod = pod("ready-no-last-transition-time", "node", v1.PodRunning, true, 0, zeroTime, zeroTime) + runningReadyNow = pod("ready-now", "node", v1.PodRunning, true, 0, now, now) + runningReadyThen = pod("ready-then", "node", v1.PodRunning, true, 0, then, then) + runningReadyNowHighRestarts = pod("ready-high-restarts", "node", v1.PodRunning, true, 9001, now, now) + runningReadyNowCreatedThen = pod("ready-now-created-then", "node", v1.PodRunning, true, 0, now, then) + ) + equalityTests := []*v1.Pod{ + unscheduledPod, + scheduledPendingPod, + unknownPhasePod, + runningNotReadyPod, + runningReadyNowCreatedThen, + runningReadyNow, + runningReadyThen, + runningReadyNowHighRestarts, + runningReadyNowCreatedThen, + } + for _, pod := range equalityTests { + podsWithRanks := ActivePodsWithRanks{ + Pods: []*v1.Pod{pod, pod}, + Rank: []int{1, 1}, + } + if podsWithRanks.Less(0, 1) || podsWithRanks.Less(1, 0) { + t.Errorf("expected pod %q not to be less than than itself", pod.Name) + } + } + type podWithRank struct { + pod *v1.Pod + rank int + } + inequalityTests := []struct { + lesser, greater podWithRank + }{ + {podWithRank{unscheduledPod, 1}, podWithRank{scheduledPendingPod, 2}}, + {podWithRank{unscheduledPod, 2}, podWithRank{scheduledPendingPod, 1}}, + {podWithRank{scheduledPendingPod, 1}, podWithRank{unknownPhasePod, 2}}, + {podWithRank{unknownPhasePod, 1}, podWithRank{runningNotReadyPod, 2}}, + {podWithRank{runningNotReadyPod, 1}, podWithRank{runningReadyNoLastTransitionTimePod, 1}}, + {podWithRank{runningReadyNoLastTransitionTimePod, 1}, podWithRank{runningReadyNow, 1}}, + {podWithRank{runningReadyNow, 2}, podWithRank{runningReadyNoLastTransitionTimePod, 1}}, + {podWithRank{runningReadyNow, 1}, podWithRank{runningReadyThen, 1}}, + {podWithRank{runningReadyNow, 2}, podWithRank{runningReadyThen, 1}}, + {podWithRank{runningReadyNowHighRestarts, 1}, podWithRank{runningReadyNow, 1}}, + {podWithRank{runningReadyNow, 2}, podWithRank{runningReadyNowHighRestarts, 1}}, + {podWithRank{runningReadyNow, 1}, podWithRank{runningReadyNowCreatedThen, 1}}, + {podWithRank{runningReadyNowCreatedThen, 2}, podWithRank{runningReadyNow, 1}}, + } + for _, test := range inequalityTests { + podsWithRanks := ActivePodsWithRanks{ + Pods: []*v1.Pod{test.lesser.pod, test.greater.pod}, + Rank: []int{test.lesser.rank, test.greater.rank}, + } + if !podsWithRanks.Less(0, 1) { + t.Errorf("expected pod %q with rank %v to be less than %q with rank %v", podsWithRanks.Pods[0].Name, podsWithRanks.Rank[0], podsWithRanks.Pods[1].Name, podsWithRanks.Rank[1]) + } + if podsWithRanks.Less(1, 0) { + t.Errorf("expected pod %q with rank %v not to be less than %v with rank %v", podsWithRanks.Pods[1].Name, podsWithRanks.Rank[1], podsWithRanks.Pods[0].Name, podsWithRanks.Rank[0]) + } + } +} + func TestActiveReplicaSetsFiltering(t *testing.T) { var replicaSets []*apps.ReplicaSet replicaSets = append(replicaSets, newReplicaSet("zero", 0)) diff --git a/pkg/controller/replicaset/BUILD b/pkg/controller/replicaset/BUILD index abdef8aeb93..f2be3ee7f53 100644 --- a/pkg/controller/replicaset/BUILD +++ b/pkg/controller/replicaset/BUILD @@ -23,6 +23,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 4e3773aff72..6727df358bb 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -41,6 +41,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" @@ -193,6 +194,43 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { <-stopCh } +// getReplicaSetsWithSameController returns a list of ReplicaSets with the same +// owner as the given ReplicaSet. +func (rsc *ReplicaSetController) getReplicaSetsWithSameController(rs *apps.ReplicaSet) []*apps.ReplicaSet { + controllerRef := metav1.GetControllerOf(rs) + if controllerRef == nil { + utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs)) + return nil + } + + allRSs, err := rsc.rsLister.ReplicaSets(rs.Namespace).List(labels.Everything()) + if err != nil { + utilruntime.HandleError(err) + return nil + } + + var relatedRSs []*apps.ReplicaSet + for _, r := range allRSs { + if ref := metav1.GetControllerOf(r); ref != nil && ref.UID == controllerRef.UID { + relatedRSs = append(relatedRSs, r) + } + } + + if klog.V(2) { + var related string + if len(relatedRSs) > 0 { + var relatedNames []string + for _, r := range relatedRSs { + relatedNames = append(relatedNames, r.Name) + } + related = ": " + strings.Join(relatedNames, ", ") + } + klog.Infof("Found %d related %vs for %v %s/%s%s", len(relatedRSs), rsc.Kind, rsc.Kind, rs.Namespace, rs.Name, related) + } + + return relatedRSs +} + // getPodReplicaSets returns a list of ReplicaSets matching the given pod. func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet { rss, err := rsc.rsLister.GetPodReplicaSets(pod) @@ -515,8 +553,11 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps } klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) + relatedPods, err := rsc.getIndirectlyRelatedPods(rs) + utilruntime.HandleError(err) + // Choose which Pods to delete, preferring those in earlier phases of startup. - podsToDelete := getPodsToDelete(filteredPods, diff) + podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff) // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either @@ -681,18 +722,67 @@ func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, erro return successes, nil } -func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod { +// getIndirectlyRelatedPods returns all pods that are owned by any ReplicaSet +// that is owned by the given ReplicaSet's owner. +func (rsc *ReplicaSetController) getIndirectlyRelatedPods(rs *apps.ReplicaSet) ([]*v1.Pod, error) { + var relatedPods []*v1.Pod + seen := make(map[types.UID]*apps.ReplicaSet) + for _, relatedRS := range rsc.getReplicaSetsWithSameController(rs) { + selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector) + if err != nil { + return nil, err + } + pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector) + if err != nil { + return nil, err + } + for _, pod := range pods { + if otherRS, found := seen[pod.UID]; found { + klog.V(5).Infof("Pod %s/%s is owned by both %v %s/%s and %v %s/%s", pod.Namespace, pod.Name, rsc.Kind, otherRS.Namespace, otherRS.Name, rsc.Kind, relatedRS.Namespace, relatedRS.Name) + continue + } + seen[pod.UID] = relatedRS + relatedPods = append(relatedPods, pod) + } + } + if klog.V(4) { + var relatedNames []string + for _, related := range relatedPods { + relatedNames = append(relatedNames, related.Name) + } + klog.Infof("Found %d related pods for %v %s/%s: %v", len(relatedPods), rsc.Kind, rs.Namespace, rs.Name, strings.Join(relatedNames, ", ")) + } + return relatedPods, nil +} + +func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod { // No need to sort pods if we are about to delete all of them. // diff will always be <= len(filteredPods), so not need to handle > case. if diff < len(filteredPods) { - // Sort the pods in the order such that not-ready < ready, unscheduled - // < scheduled, and pending < running. This ensures that we delete pods - // in the earlier stages whenever possible. - sort.Sort(controller.ActivePods(filteredPods)) + podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods) + sort.Sort(podsWithRanks) } return filteredPods[:diff] } +// getPodsRankedByRelatedPodsOnSameNode returns an ActivePodsWithRanks value +// that wraps podsToRank and assigns each pod a rank equal to the number of +// active pods in relatedPods that are colocated on the same node with the pod. +// relatedPods generally should be a superset of podsToRank. +func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks { + podsOnNode := make(map[string]int) + for _, pod := range relatedPods { + if controller.IsPodActive(pod) { + podsOnNode[pod.Spec.NodeName]++ + } + } + ranks := make([]int, len(podsToRank)) + for i, pod := range podsToRank { + ranks[i] = podsOnNode[pod.Spec.NodeName] + } + return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks} +} + func getPodKeys(pods []*v1.Pod) []string { podKeys := make([]string, 0, len(pods)) for _, pod := range pods { diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 99847bc7a92..4f6dafe3eba 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -23,6 +23,7 @@ import ( "net/http/httptest" "net/url" "reflect" + "sort" "strings" "sync" "testing" @@ -80,12 +81,16 @@ func skipListerFunc(verb string, url url.URL) bool { var alwaysReady = func() bool { return true } func newReplicaSet(replicas int, selectorMap map[string]string) *apps.ReplicaSet { + isController := true rs := &apps.ReplicaSet{ TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "ReplicaSet"}, ObjectMeta: metav1.ObjectMeta{ - UID: uuid.NewUUID(), - Name: "foobar", - Namespace: metav1.NamespaceDefault, + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: metav1.NamespaceDefault, + OwnerReferences: []metav1.OwnerReference{ + {UID: "123", Controller: &isController}, + }, ResourceVersion: "18", }, Spec: apps.ReplicaSetSpec{ @@ -136,6 +141,7 @@ func newPod(name string, rs *apps.ReplicaSet, status v1.PodPhase, lastTransition } return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), Name: name, Namespace: rs.Namespace, Labels: rs.Spec.Selector.MatchLabels, @@ -342,6 +348,68 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakeHandler.ValidateRequestCount(t, 2) } +func TestGetReplicaSetsWithSameController(t *testing.T) { + someRS := newReplicaSet(1, map[string]string{"foo": "bar"}) + someRS.Name = "rs1" + relatedRS := newReplicaSet(1, map[string]string{"foo": "baz"}) + relatedRS.Name = "rs2" + unrelatedRS := newReplicaSet(1, map[string]string{"foo": "quux"}) + unrelatedRS.Name = "rs3" + unrelatedRS.ObjectMeta.OwnerReferences[0].UID = "456" + pendingDeletionRS := newReplicaSet(1, map[string]string{"foo": "xyzzy"}) + pendingDeletionRS.Name = "rs4" + pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789" + now := metav1.Now() + pendingDeletionRS.DeletionTimestamp = &now + + stopCh := make(chan struct{}) + defer close(stopCh) + manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas) + testCases := []struct { + name string + rss []*apps.ReplicaSet + rs *apps.ReplicaSet + expectedRSs []*apps.ReplicaSet + }{ + { + name: "expect to get back a ReplicaSet that is pending deletion", + rss: []*apps.ReplicaSet{pendingDeletionRS, unrelatedRS}, + rs: pendingDeletionRS, + expectedRSs: []*apps.ReplicaSet{pendingDeletionRS}, + }, + { + name: "expect to get back only the given ReplicaSet if there is no related ReplicaSet", + rss: []*apps.ReplicaSet{someRS, unrelatedRS}, + rs: someRS, + expectedRSs: []*apps.ReplicaSet{someRS}, + }, + { + name: "expect to get back the given ReplicaSet as well as any related ReplicaSet but not an unrelated ReplicaSet", + rss: []*apps.ReplicaSet{someRS, relatedRS, unrelatedRS}, + rs: someRS, + expectedRSs: []*apps.ReplicaSet{someRS, relatedRS}, + }, + } + for _, c := range testCases { + for _, r := range c.rss { + informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r) + } + actualRSs := manager.getReplicaSetsWithSameController(c.rs) + var actualRSNames, expectedRSNames []string + for _, r := range actualRSs { + actualRSNames = append(actualRSNames, r.Name) + } + for _, r := range c.expectedRSs { + expectedRSNames = append(expectedRSNames, r.Name) + } + sort.Strings(actualRSNames) + sort.Strings(expectedRSNames) + if !reflect.DeepEqual(actualRSNames, expectedRSNames) { + t.Errorf("Got [%s]; expected [%s]", strings.Join(actualRSNames, ", "), strings.Join(expectedRSNames, ", ")) + } + } +} + func TestPodControllerLookup(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) @@ -408,6 +476,87 @@ func TestPodControllerLookup(t *testing.T) { } } +// byName sorts pods by their names. +type byName []*v1.Pod + +func (pods byName) Len() int { return len(pods) } +func (pods byName) Swap(i, j int) { pods[i], pods[j] = pods[j], pods[i] } +func (pods byName) Less(i, j int) bool { return pods[i].Name < pods[j].Name } + +func TestRelatedPodsLookup(t *testing.T) { + someRS := newReplicaSet(1, map[string]string{"foo": "bar"}) + someRS.Name = "foo1" + relatedRS := newReplicaSet(1, map[string]string{"foo": "baz"}) + relatedRS.Name = "foo2" + unrelatedRS := newReplicaSet(1, map[string]string{"foo": "quux"}) + unrelatedRS.Name = "bar1" + unrelatedRS.ObjectMeta.OwnerReferences[0].UID = "456" + pendingDeletionRS := newReplicaSet(1, map[string]string{"foo": "xyzzy"}) + pendingDeletionRS.Name = "foo3" + pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789" + now := metav1.Now() + pendingDeletionRS.DeletionTimestamp = &now + pod1 := newPod("pod1", someRS, v1.PodRunning, nil, true) + pod2 := newPod("pod2", someRS, v1.PodRunning, nil, true) + pod3 := newPod("pod3", relatedRS, v1.PodRunning, nil, true) + pod4 := newPod("pod4", unrelatedRS, v1.PodRunning, nil, true) + + stopCh := make(chan struct{}) + defer close(stopCh) + manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas) + testCases := []struct { + name string + rss []*apps.ReplicaSet + pods []*v1.Pod + rs *apps.ReplicaSet + expectedPodNames []string + }{ + { + name: "expect to get a pod even if its owning ReplicaSet is pending deletion", + rss: []*apps.ReplicaSet{pendingDeletionRS, unrelatedRS}, + rs: pendingDeletionRS, + pods: []*v1.Pod{newPod("pod", pendingDeletionRS, v1.PodRunning, nil, true)}, + expectedPodNames: []string{"pod"}, + }, + { + name: "expect to get only the ReplicaSet's own pods if there is no related ReplicaSet", + rss: []*apps.ReplicaSet{someRS, unrelatedRS}, + rs: someRS, + pods: []*v1.Pod{pod1, pod2, pod4}, + expectedPodNames: []string{"pod1", "pod2"}, + }, + { + name: "expect to get own pods as well as any related ReplicaSet's but not an unrelated ReplicaSet's", + rss: []*apps.ReplicaSet{someRS, relatedRS, unrelatedRS}, + rs: someRS, + pods: []*v1.Pod{pod1, pod2, pod3, pod4}, + expectedPodNames: []string{"pod1", "pod2", "pod3"}, + }, + } + for _, c := range testCases { + for _, r := range c.rss { + informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r) + } + for _, pod := range c.pods { + informers.Core().V1().Pods().Informer().GetIndexer().Add(pod) + manager.addPod(pod) + } + actualPods, err := manager.getIndirectlyRelatedPods(c.rs) + if err != nil { + t.Errorf("Unexpected error from getIndirectlyRelatedPods: %v", err) + } + var actualPodNames []string + for _, pod := range actualPods { + actualPodNames = append(actualPodNames, pod.Name) + } + sort.Strings(actualPodNames) + sort.Strings(c.expectedPodNames) + if !reflect.DeepEqual(actualPodNames, c.expectedPodNames) { + t.Errorf("Got [%s]; expected [%s]", strings.Join(actualPodNames, ", "), strings.Join(c.expectedPodNames, ", ")) + } + } +} + func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() client := fake.NewSimpleClientset() @@ -1445,10 +1594,19 @@ func TestGetPodsToDelete(t *testing.T) { Status: v1.ConditionFalse, }, } - // a scheduled, running, ready pod - scheduledRunningReadyPod := newPod("scheduled-running-ready-pod", rs, v1.PodRunning, nil, true) - scheduledRunningReadyPod.Spec.NodeName = "fake-node" - scheduledRunningReadyPod.Status.Conditions = []v1.PodCondition{ + // a scheduled, running, ready pod on fake-node-1 + scheduledRunningReadyPodOnNode1 := newPod("scheduled-running-ready-pod-on-node-1", rs, v1.PodRunning, nil, true) + scheduledRunningReadyPodOnNode1.Spec.NodeName = "fake-node-1" + scheduledRunningReadyPodOnNode1.Status.Conditions = []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + } + // a scheduled, running, ready pod on fake-node-2 + scheduledRunningReadyPodOnNode2 := newPod("scheduled-running-ready-pod-on-node-2", rs, v1.PodRunning, nil, true) + scheduledRunningReadyPodOnNode2.Spec.NodeName = "fake-node-2" + scheduledRunningReadyPodOnNode2.Status.Conditions = []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, @@ -1456,8 +1614,10 @@ func TestGetPodsToDelete(t *testing.T) { } tests := []struct { - name string - pods []*v1.Pod + name string + pods []*v1.Pod + // related defaults to pods if nil. + related []*v1.Pod diff int expectedPodsToDelete []*v1.Pod }{ @@ -1465,93 +1625,136 @@ func TestGetPodsToDelete(t *testing.T) { // an unscheduled, pending pod // a scheduled, pending pod // a scheduled, running, not-ready pod - // a scheduled, running, ready pod + // a scheduled, running, ready pod on same node as a related pod + // a scheduled, running, ready pod not on node with related pods // Note that a pending pod cannot be ready { - "len(pods) = 0 (i.e., diff = 0 too)", - []*v1.Pod{}, - 0, - []*v1.Pod{}, + name: "len(pods) = 0 (i.e., diff = 0 too)", + pods: []*v1.Pod{}, + diff: 0, + expectedPodsToDelete: []*v1.Pod{}, }, { - "diff = len(pods)", - []*v1.Pod{ + name: "diff = len(pods)", + pods: []*v1.Pod{ scheduledRunningNotReadyPod, - scheduledRunningReadyPod, + scheduledRunningReadyPodOnNode1, }, - 2, - []*v1.Pod{scheduledRunningNotReadyPod, scheduledRunningReadyPod}, + diff: 2, + expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod, scheduledRunningReadyPodOnNode1}, }, { - "diff < len(pods)", - []*v1.Pod{ - scheduledRunningReadyPod, + name: "diff < len(pods)", + pods: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, scheduledRunningNotReadyPod, }, - 1, - []*v1.Pod{scheduledRunningNotReadyPod}, + diff: 1, + expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod}, }, { - "various pod phases and conditions, diff = len(pods)", - []*v1.Pod{ - scheduledRunningReadyPod, + name: "various pod phases and conditions, diff = len(pods)", + pods: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, }, - 4, - []*v1.Pod{ - scheduledRunningReadyPod, + diff: 6, + expectedPodsToDelete: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, }, }, { - "scheduled vs unscheduled, diff < len(pods)", - []*v1.Pod{ + name: "various pod phases and conditions, diff = len(pods), relatedPods empty", + pods: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, + scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, }, - 1, - []*v1.Pod{ + related: []*v1.Pod{}, + diff: 6, + expectedPodsToDelete: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, + scheduledRunningNotReadyPod, + scheduledPendingPod, unscheduledPendingPod, }, }, { - "ready vs not-ready, diff < len(pods)", - []*v1.Pod{ - scheduledRunningReadyPod, + name: "scheduled vs unscheduled, diff < len(pods)", + pods: []*v1.Pod{ + scheduledPendingPod, + unscheduledPendingPod, + }, + diff: 1, + expectedPodsToDelete: []*v1.Pod{ + unscheduledPendingPod, + }, + }, + { + name: "ready vs not-ready, diff < len(pods)", + pods: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, scheduledRunningNotReadyPod, scheduledRunningNotReadyPod, }, - 2, - []*v1.Pod{ + diff: 2, + expectedPodsToDelete: []*v1.Pod{ scheduledRunningNotReadyPod, scheduledRunningNotReadyPod, }, }, { - "pending vs running, diff < len(pods)", - []*v1.Pod{ + name: "ready and colocated with another ready pod vs not colocated, diff < len(pods)", + pods: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, + }, + related: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, + scheduledRunningReadyPodOnNode2, + }, + diff: 1, + expectedPodsToDelete: []*v1.Pod{ + scheduledRunningReadyPodOnNode2, + }, + }, + { + name: "pending vs running, diff < len(pods)", + pods: []*v1.Pod{ scheduledPendingPod, scheduledRunningNotReadyPod, }, - 1, - []*v1.Pod{ + diff: 1, + expectedPodsToDelete: []*v1.Pod{ scheduledPendingPod, }, }, { - "various pod phases and conditions, diff < len(pods)", - []*v1.Pod{ - scheduledRunningReadyPod, + name: "various pod phases and conditions, diff < len(pods)", + pods: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, }, - 3, - []*v1.Pod{ + diff: 3, + expectedPodsToDelete: []*v1.Pod{ unscheduledPendingPod, scheduledPendingPod, scheduledRunningNotReadyPod, @@ -1560,7 +1763,11 @@ func TestGetPodsToDelete(t *testing.T) { } for _, test := range tests { - podsToDelete := getPodsToDelete(test.pods, test.diff) + related := test.related + if related == nil { + related = test.pods + } + podsToDelete := getPodsToDelete(test.pods, related, test.diff) if len(podsToDelete) != len(test.expectedPodsToDelete) { t.Errorf("%s: unexpected pods to delete, expected %v, got %v", test.name, test.expectedPodsToDelete, podsToDelete) } diff --git a/test/e2e/apps/deployment.go b/test/e2e/apps/deployment.go index 67e0087c8cc..94c375fe252 100644 --- a/test/e2e/apps/deployment.go +++ b/test/e2e/apps/deployment.go @@ -42,6 +42,7 @@ import ( e2edeploy "k8s.io/kubernetes/test/e2e/framework/deployment" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" "k8s.io/kubernetes/test/e2e/framework/replicaset" + e2eservice "k8s.io/kubernetes/test/e2e/framework/service" testutil "k8s.io/kubernetes/test/utils" utilpointer "k8s.io/utils/pointer" ) @@ -119,6 +120,10 @@ var _ = SIGDescribe("Deployment", func() { framework.ConformanceIt("deployment should support proportional scaling", func() { testProportionalScalingDeployment(f) }) + ginkgo.It("should not disrupt a cloud load-balancer's connectivity during rollout", func() { + framework.SkipUnlessProviderIs("aws", "azure", "gce", "gke") + testRollingUpdateDeploymentWithLocalTrafficLoadBalancer(f) + }) // TODO: add tests that cover deployment.Spec.MinReadySeconds once we solved clock-skew issues // See https://github.com/kubernetes/kubernetes/issues/29229 }) @@ -856,3 +861,151 @@ func orphanDeploymentReplicaSets(c clientset.Interface, d *appsv1.Deployment) er deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(d.UID)) return c.AppsV1().Deployments(d.Namespace).Delete(d.Name, deleteOptions) } + +func testRollingUpdateDeploymentWithLocalTrafficLoadBalancer(f *framework.Framework) { + ns := f.Namespace.Name + c := f.ClientSet + + name := "test-rolling-update-with-lb" + framework.Logf("Creating Deployment %q", name) + podLabels := map[string]string{"name": name} + replicas := int32(3) + d := e2edeploy.NewDeployment(name, replicas, podLabels, AgnhostImageName, AgnhostImage, appsv1.RollingUpdateDeploymentStrategyType) + // NewDeployment assigned the same value to both d.Spec.Selector and + // d.Spec.Template.Labels, so mutating the one would mutate the other. + // Thus we need to set d.Spec.Template.Labels to a new value if we want + // to mutate it alone. + d.Spec.Template.Labels = map[string]string{ + "iteration": "0", + "name": name, + } + d.Spec.Template.Spec.Containers[0].Args = []string{"netexec", "--http-port=80", "--udp-port=80"} + // To ensure that a node that had a local endpoint prior to a rolling + // update continues to have a local endpoint throughout the rollout, we + // need an affinity policy that will cause pods to be scheduled on the + // same nodes as old pods, and we need the deployment to scale up a new + // pod before deleting an old pod. This affinity policy will define + // inter-pod affinity for pods of different rollouts and anti-affinity + // for pods of the same rollout, so it will need to be updated when + // performing a rollout. + setAffinity(d) + d.Spec.Strategy.RollingUpdate = &appsv1.RollingUpdateDeployment{ + MaxSurge: intOrStrP(1), + MaxUnavailable: intOrStrP(0), + } + deployment, err := c.AppsV1().Deployments(ns).Create(d) + framework.ExpectNoError(err) + err = e2edeploy.WaitForDeploymentComplete(c, deployment) + framework.ExpectNoError(err) + + framework.Logf("Creating a service %s with type=LoadBalancer and externalTrafficPolicy=Local in namespace %s", name, ns) + jig := e2eservice.NewTestJig(c, name) + jig.Labels = podLabels + service := jig.CreateLoadBalancerService(ns, name, e2eservice.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }) + + lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0]) + svcPort := int(service.Spec.Ports[0].Port) + + framework.Logf("Hitting the replica set's pods through the service's load balancer") + timeout := e2eservice.LoadBalancerLagTimeoutDefault + if framework.ProviderIs("aws") { + timeout = e2eservice.LoadBalancerLagTimeoutAWS + } + e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout) + + framework.Logf("Starting a goroutine to watch the service's endpoints in the background") + done := make(chan struct{}) + failed := make(chan struct{}) + defer close(done) + go func() { + defer ginkgo.GinkgoRecover() + expectedNodes := jig.GetEndpointNodeNames(service) + // The affinity policy should ensure that before an old pod is + // deleted, a new pod will have been created on the same node. + // Thus the set of nodes with local endpoints for the service + // should remain unchanged. + wait.Until(func() { + actualNodes := jig.GetEndpointNodeNames(service) + if !actualNodes.Equal(expectedNodes) { + framework.Logf("The set of nodes with local endpoints changed; started with %v, now have %v", expectedNodes.List(), actualNodes.List()) + failed <- struct{}{} + } + }, framework.Poll, done) + }() + + framework.Logf("Triggering a rolling deployment several times") + for i := 1; i <= 3; i++ { + framework.Logf("Updating label deployment %q pod spec (iteration #%d)", name, i) + deployment, err = e2edeploy.UpdateDeploymentWithRetries(c, ns, d.Name, func(update *appsv1.Deployment) { + update.Spec.Template.Labels["iteration"] = fmt.Sprintf("%d", i) + setAffinity(update) + }) + framework.ExpectNoError(err) + + framework.Logf("Waiting for observed generation %d", deployment.Generation) + err = e2edeploy.WaitForObservedDeployment(c, ns, name, deployment.Generation) + framework.ExpectNoError(err) + + framework.Logf("Make sure deployment %q is complete", name) + err = e2edeploy.WaitForDeploymentCompleteAndCheckRolling(c, deployment) + framework.ExpectNoError(err) + } + + select { + case <-failed: + framework.Failf("Connectivity to the load balancer was interrupted") + case <-time.After(1 * time.Minute): + } +} + +func setAffinity(d *appsv1.Deployment) { + d.Spec.Template.Spec.Affinity = &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ + { + Weight: int32(100), + PodAffinityTerm: v1.PodAffinityTerm{ + TopologyKey: "kubernetes.io/hostname", + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{d.Spec.Template.Labels["name"]}, + }, + { + Key: "iteration", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{d.Spec.Template.Labels["iteration"]}, + }, + }, + }, + }, + }, + }, + }, + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + TopologyKey: "kubernetes.io/hostname", + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{d.Spec.Template.Labels["name"]}, + }, + { + Key: "iteration", + Operator: metav1.LabelSelectorOpIn, + Values: []string{d.Spec.Template.Labels["iteration"]}, + }, + }, + }, + }, + }, + }, + } +} diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index 49fae155d8d..ec1196d8dd1 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -266,6 +266,19 @@ func (j *TestJig) CreateLoadBalancerService(namespace, serviceName string, timeo func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string { nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests) framework.ExpectNoError(err) + epNodes := j.GetEndpointNodeNames(svc) + nodeMap := map[string][]string{} + for _, n := range nodes.Items { + if epNodes.Has(n.Name) { + nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP) + } + } + return nodeMap +} + +// GetEndpointNodeNames returns a string set of node names on which the +// endpoints of the given Service are running. +func (j *TestJig) GetEndpointNodeNames(svc *v1.Service) sets.String { endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{}) if err != nil { framework.Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err) @@ -281,13 +294,7 @@ func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string { } } } - nodeMap := map[string][]string{} - for _, n := range nodes.Items { - if epNodes.Has(n.Name) { - nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP) - } - } - return nodeMap + return epNodes } // WaitForEndpointOnNode waits for a service endpoint on the given node.