RC/RS: Use ControllerRef to route watch events.

This is part of the completion of ControllerRef, as described here:

https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches

This also removes the need for the Pod->Controller mapping cache in RC
and RS. This mapping is now persisted in the Pod's ControllerRef
instead.
This commit is contained in:
Anthony Yeh
2017-02-23 08:58:28 -08:00
parent 298db3a0c3
commit ca13b9e532
13 changed files with 486 additions and 416 deletions

View File

@@ -32,7 +32,6 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",

View File

@@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -59,9 +58,8 @@ const (
statusUpdateRetries = 1
)
func getRSKind() schema.GroupVersionKind {
return v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
}
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
// ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
// in the system with actual running pods.
@@ -90,14 +88,12 @@ type ReplicaSetController struct {
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced
lookupCache *controller.MatchingCache
// Controllers that need to be synced
queue workqueue.RateLimitingInterface
}
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
@@ -139,7 +135,6 @@ func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer,
rsc.podListerSynced = podInformer.Informer().HasSynced
rsc.syncHandler = rsc.syncReplicaSet
rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rsc
}
@@ -172,46 +167,19 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Shutting down ReplicaSet Controller")
}
// getPodReplicaSet returns the replica set managing the given pod.
// TODO: Surface that we are ignoring multiple replica sets for a single pod.
// TODO: use ownerReference.Controller to determine if the rs controls the pod.
func (rsc *ReplicaSetController) getPodReplicaSet(pod *v1.Pod) *extensions.ReplicaSet {
// look up in the cache, if cached and the cache is valid, just return cached value
if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached {
rs, ok := obj.(*extensions.ReplicaSet)
if !ok {
// This should not happen
utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicaSet object"))
return nil
}
if cached && rsc.isCacheValid(pod, rs) {
return rs
}
}
// if not cached or cached value is invalid, search all the rs to find the matching one, and update cache
// getPodReplicaSets returns a list of ReplicaSets matching the given pod.
func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.ReplicaSet {
rss, err := rsc.rsLister.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name)
return nil
}
// In theory, overlapping ReplicaSets is user error. This sorting will not prevent
// oscillation of replicas in all cases, eg:
// rs1 (older rs): [(k1=v1)], replicas=1 rs2: [(k2=v2)], replicas=2
// pod: [(k1:v1), (k2:v2)] will wake both rs1 and rs2, and we will sync rs1.
// pod: [(k2:v2)] will wake rs2 which creates a new replica.
if len(rss) > 1 {
// More than two items in this list indicates user error. If two replicasets
// overlap, sort by creation timestamp, subsort by name, then pick
// the first.
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels))
sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss))
}
// update lookup cache
rsc.lookupCache.Update(pod, rss[0])
return rss[0]
return rss
}
// callback when RS is updated
@@ -219,21 +187,6 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
oldRS := old.(*extensions.ReplicaSet)
curRS := cur.(*extensions.ReplicaSet)
// We should invalidate the whole lookup cache if a RS's selector has been updated.
//
// Imagine that you have two RSs:
// * old RS1
// * new RS2
// You also have a pod that is attached to RS2 (because it doesn't match RS1 selector).
// Now imagine that you are changing RS1 selector so that it is now matching that pod,
// in such case we must invalidate the whole cache so that pod could be adopted by RS1
//
// This makes the lookup cache less helpful, but selector update does not happen often,
// so it's not a big problem
if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) {
rsc.lookupCache.InvalidateAll()
}
// You might imagine that we only really need to enqueue the
// replica set when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
@@ -252,57 +205,44 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
rsc.enqueueReplicaSet(cur)
}
// isCacheValid check if the cache is valid
func (rsc *ReplicaSetController) isCacheValid(pod *v1.Pod, cachedRS *extensions.ReplicaSet) bool {
_, err := rsc.rsLister.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name)
// rs has been deleted or updated, cache is invalid
if err != nil || !isReplicaSetMatch(pod, cachedRS) {
return false
}
return true
}
// isReplicaSetMatch take a Pod and ReplicaSet, return whether the Pod and ReplicaSet are matching
// TODO(mqliang): This logic is a copy from GetPodReplicaSets(), remove the duplication
func isReplicaSetMatch(pod *v1.Pod, rs *extensions.ReplicaSet) bool {
if rs.Namespace != pod.Namespace {
return false
}
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
err = fmt.Errorf("invalid selector: %v", err)
return false
}
// If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
return false
}
return true
}
// When a pod is created, enqueue the replica set that manages it and update it's expectations.
// When a pod is created, enqueue the replica set that manages it and update its expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rs := rsc.getPodReplicaSet(pod)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for replica set %#v: %v", rs, err))
return
}
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rsc.deletePod(pod)
return
}
rsc.expectations.CreationObserved(rsKey)
rsc.enqueueReplicaSet(rs)
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
rsc.expectations.CreationObserved(rsKey)
rsc.enqueueReplicaSet(rs)
return
}
// Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, rs := range rsc.getPodReplicaSets(pod) {
rsc.enqueueReplicaSet(rs)
}
}
// When a pod is updated, figure out what replica set/s manage it and wake them
@@ -317,6 +257,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
return
}
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
@@ -332,18 +273,29 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
return
}
// Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod.
if labelChanged {
// If the old and new ReplicaSet are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldRS := rsc.getPodReplicaSet(oldPod); oldRS != nil {
rsc.enqueueReplicaSet(oldRS)
curControllerRef := controller.GetControllerOf(curPod)
oldControllerRef := controller.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged &&
oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind {
// The ControllerRef was changed. Sync the old controller, if any.
rs, err := rsc.rsLister.ReplicaSets(oldPod.Namespace).Get(oldControllerRef.Name)
if err == nil {
rsc.enqueueReplicaSet(rs)
}
}
changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod)
if curRS := rsc.getPodReplicaSet(curPod); curRS != nil {
rsc.enqueueReplicaSet(curRS)
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
if curControllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
rs, err := rsc.rsLister.ReplicaSets(curPod.Namespace).Get(curControllerRef.Name)
if err != nil {
return
}
rsc.enqueueReplicaSet(rs)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica. For now, we can fake the
@@ -351,9 +303,18 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
// a Pod transitioned to Ready.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the replica set controller).
if changedToReady && curRS.Spec.MinReadySeconds > 0 {
glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", curRS.Name, curRS.Spec.MinReadySeconds)
rsc.enqueueReplicaSetAfter(curRS, time.Duration(curRS.Spec.MinReadySeconds)*time.Second)
if !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", rs.Name, rs.Spec.MinReadySeconds)
rsc.enqueueReplicaSetAfter(rs, time.Duration(rs.Spec.MinReadySeconds)*time.Second)
}
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
for _, rs := range rsc.getPodReplicaSets(curPod) {
rsc.enqueueReplicaSet(rs)
}
}
}
@@ -370,41 +331,46 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj))
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
return
}
}
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
if rs := rsc.getPodReplicaSet(pod); rs != nil {
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
return
}
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rsc.enqueueReplicaSet(rs)
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rsc.enqueueReplicaSet(rs)
}
// obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
// TODO: Handle overlapping replica sets better. Either disallow them at admission time or
// deterministically avoid syncing replica sets that fight over pods. Currently, we only
// ensure that the same replica set is synced for a given pod. When we periodically relist
// all replica sets there will still be some replica instability. One way to handle this is
// by querying the store for all replica sets that this replica set overlaps, as well as all
// replica sets that overlap this ReplicaSet, and sorting them.
rsc.queue.Add(key)
}
@@ -412,16 +378,9 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
// TODO: Handle overlapping replica sets better. Either disallow them at admission time or
// deterministically avoid syncing replica sets that fight over pods. Currently, we only
// ensure that the same replica set is synced for a given pod. When we periodically relist
// all replica sets there will still be some replica instability. One way to handle this is
// by querying the store for all replica sets that this replica set overlaps, as well as all
// replica sets that overlap this ReplicaSet, and sorting them.
rsc.queue.AddAfter(key, after)
}
@@ -483,8 +442,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
var err error
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: getRSKind().GroupVersion().String(),
Kind: getRSKind().Kind,
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
@@ -592,7 +551,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
if err != nil {
return err
}
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, getRSKind())
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind)
filteredPods, err = cm.ClaimPods(pods)
if err != nil {
// Something went wrong with adoption or release.

View File

@@ -51,7 +51,7 @@ import (
"k8s.io/kubernetes/pkg/securitycontext"
)
func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int, lookupCacheSize int) (*ReplicaSetController, informers.SharedInformerFactory) {
func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) {
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
ret := NewReplicaSetController(
@@ -59,7 +59,6 @@ func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh ch
informers.Core().V1().Pods(),
client,
burstReplicas,
lookupCacheSize,
)
ret.podListerSynced = alwaysReady
@@ -216,7 +215,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
// 2 running pods, a controller with 2 replicas, sync is a no-op
labelMap := map[string]string{"foo": "bar"}
@@ -234,7 +233,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager.podControl = &fakePodControl
// 2 running pods and a controller with 1 replica, one pod delete expected
@@ -252,7 +251,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager.podControl = &fakePodControl
received := make(chan string)
@@ -286,7 +285,7 @@ func TestSyncReplicaSetCreates(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
// A controller with 2 replicas and no pods in the store, 2 creates expected
labelMap := map[string]string{"foo": "bar"}
@@ -311,7 +310,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
// Steady state for the ReplicaSet, no Status.Replicas updates expected
activePods := 5
@@ -356,7 +355,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
// Insufficient number of pods in the system, and Status.Replicas is wrong;
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
@@ -405,7 +404,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
manager.podControl = &fakePodControl
@@ -461,7 +460,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
func TestPodControllerLookup(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), stopCh, BurstReplicas)
testCases := []struct {
inRSs []*extensions.ReplicaSet
pod *v1.Pod
@@ -509,7 +508,12 @@ func TestPodControllerLookup(t *testing.T) {
for _, r := range c.inRSs {
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(r)
}
if rs := manager.getPodReplicaSet(c.pod); rs != nil {
if rss := manager.getPodReplicaSets(c.pod); rss != nil {
if len(rss) != 1 {
t.Errorf("len(rss) = %v, want %v", len(rss), 1)
continue
}
rs := rss[0]
if c.outRSName != rs.Name {
t.Errorf("Got replica set %+v expected %+v", rs.Name, c.outRSName)
}
@@ -536,7 +540,6 @@ func TestWatchControllers(t *testing.T) {
informers.Core().V1().Pods(),
client,
BurstReplicas,
0,
)
informers.Start(stopCh)
@@ -581,7 +584,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
// Put one ReplicaSet into the shared informer
labelMap := map[string]string{"foo": "bar"}
@@ -627,7 +630,7 @@ func TestWatchPods(t *testing.T) {
func TestUpdatePods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas)
received := make(chan string)
@@ -656,16 +659,19 @@ func TestUpdatePods(t *testing.T) {
testRSSpec2.Name = "barfoo"
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(&testRSSpec2)
// case 1: We put in the podLister a pod with labels matching testRSSpec1,
// then update its labels to match testRSSpec2. We expect to receive a sync
// request for both replica sets.
isController := true
controllerRef1 := metav1.OwnerReference{UID: testRSSpec1.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec1.Name, Controller: &isController}
controllerRef2 := metav1.OwnerReference{UID: testRSSpec2.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec2.Name, Controller: &isController}
// case 1: Pod with a ControllerRef
pod1 := newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1}
pod1.ResourceVersion = "1"
pod2 := pod1
pod2.Labels = labelMap2
pod2.ResourceVersion = "2"
manager.updatePod(&pod1, &pod2)
expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name)
expected := sets.NewString(testRSSpec1.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
select {
@@ -674,17 +680,20 @@ func TestUpdatePods(t *testing.T) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update notifications for replica sets within 100ms each")
t.Errorf("Expected update notifications for replica sets")
}
}
// case 2: pod1 in the podLister has labels matching testRSSpec1. We update
// its labels to match no replica set. We expect to receive a sync request
// for testRSSpec1.
pod2.Labels = make(map[string]string)
// case 2: Remove ControllerRef (orphan). Expect to sync label-matching RS.
pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
pod1.ResourceVersion = "1"
pod1.Labels = labelMap2
pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2}
pod2 = pod1
pod2.OwnerReferences = nil
pod2.ResourceVersion = "2"
manager.updatePod(&pod1, &pod2)
expected = sets.NewString(testRSSpec1.Name)
expected = sets.NewString(testRSSpec2.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
select {
@@ -693,7 +702,52 @@ func TestUpdatePods(t *testing.T) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update notifications for replica sets within 100ms each")
t.Errorf("Expected update notifications for replica sets")
}
}
// case 2: Remove ControllerRef (orphan). Expect to sync both former owner and
// any label-matching RS.
pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
pod1.ResourceVersion = "1"
pod1.Labels = labelMap2
pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1}
pod2 = pod1
pod2.OwnerReferences = nil
pod2.ResourceVersion = "2"
manager.updatePod(&pod1, &pod2)
expected = sets.NewString(testRSSpec1.Name, testRSSpec2.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
select {
case got := <-received:
if !expected.Has(got) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update notifications for replica sets")
}
}
// case 4: Keep ControllerRef, change labels. Expect to sync owning RS.
pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
pod1.ResourceVersion = "1"
pod1.Labels = labelMap1
pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2}
pod2 = pod1
pod2.Labels = labelMap2
pod2.ResourceVersion = "2"
manager.updatePod(&pod1, &pod2)
expected = sets.NewString(testRSSpec2.Name)
for _, name := range expected.List() {
t.Logf("Expecting update for %+v", name)
select {
case got := <-received:
if !expected.Has(got) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update notifications for replica sets")
}
}
}
@@ -711,7 +765,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(1, labelMap)
@@ -782,7 +836,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas)
manager.podControl = &fakePodControl
labelMap := map[string]string{"foo": "bar"}
@@ -845,6 +899,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// the rs is waiting for.
expectedDels := manager.expectations.GetUIDs(getKey(rsSpec, t))
podsToDelete := []*v1.Pod{}
isController := true
for _, key := range expectedDels.List() {
nsName := strings.Split(key, "/")
podsToDelete = append(podsToDelete, &v1.Pod{
@@ -852,6 +907,9 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
Name: nsName[1],
Namespace: nsName[0],
Labels: rsSpec.Spec.Selector.MatchLabels,
OwnerReferences: []metav1.OwnerReference{
{UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController},
},
},
})
}
@@ -888,11 +946,15 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
t.Fatalf("Waiting on unexpected number of deletes.")
}
nsName := strings.Split(expectedDel.List()[0], "/")
isController := true
lastPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: nsName[1],
Namespace: nsName[0],
Labels: rsSpec.Spec.Selector.MatchLabels,
OwnerReferences: []metav1.OwnerReference{
{UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController},
},
},
}
informers.Core().V1().Pods().Informer().GetIndexer().Delete(lastPod)
@@ -935,7 +997,7 @@ func TestRSSyncExpectations(t *testing.T) {
fakePodControl := controller.FakePodControl{}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2)
manager.podControl = &fakePodControl
labelMap := map[string]string{"foo": "bar"}
@@ -961,7 +1023,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10)
rs := newReplicaSet(1, map[string]string{"foo": "bar"})
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs)
@@ -1015,34 +1077,42 @@ func TestOverlappingRSs(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
labelMap := map[string]string{"foo": "bar"}
for i := 0; i < 5; i++ {
func() {
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0)
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10)
// Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store
var controllers []*extensions.ReplicaSet
for j := 1; j < 10; j++ {
rsSpec := newReplicaSet(1, labelMap)
rsSpec.CreationTimestamp = metav1.Date(2014, time.December, j, 0, 0, 0, 0, time.Local)
rsSpec.Name = string(uuid.NewUUID())
controllers = append(controllers, rsSpec)
}
shuffledControllers := shuffle(controllers)
for j := range shuffledControllers {
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(shuffledControllers[j])
}
// Add a pod and make sure only the oldest ReplicaSet is synced
pods := newPodList(nil, 1, v1.PodPending, labelMap, controllers[0], "pod")
rsKey := getKey(controllers[0], t)
// Create 10 ReplicaSets, shuffled them randomly and insert them into the
// ReplicaSet controller's store.
// All use the same CreationTimestamp since ControllerRef should be able
// to handle that.
timestamp := metav1.Date(2014, time.December, 0, 0, 0, 0, 0, time.Local)
var controllers []*extensions.ReplicaSet
for j := 1; j < 10; j++ {
rsSpec := newReplicaSet(1, labelMap)
rsSpec.CreationTimestamp = timestamp
rsSpec.Name = fmt.Sprintf("rs%d", j)
controllers = append(controllers, rsSpec)
}
shuffledControllers := shuffle(controllers)
for j := range shuffledControllers {
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(shuffledControllers[j])
}
// Add a pod with a ControllerRef and make sure only the corresponding
// ReplicaSet is synced. Pick a RS in the middle since the old code used to
// sort by name if all timestamps were equal.
rs := controllers[3]
pods := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod")
pod := &pods.Items[0]
isController := true
pod.OwnerReferences = []metav1.OwnerReference{
{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController},
}
rsKey := getKey(rs, t)
manager.addPod(&pods.Items[0])
queueRS, _ := manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
}()
manager.addPod(pod)
queueRS, _ := manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
}
@@ -1051,7 +1121,7 @@ func TestDeletionTimestamp(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10, 0)
manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10)
rs := newReplicaSet(1, labelMap)
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs)
@@ -1098,11 +1168,15 @@ func TestDeletionTimestamp(t *testing.T) {
// An update to the pod (including an update to the deletion timestamp)
// should not be counted as a second delete.
isController := true
secondPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: "secondPod",
Labels: pod.Labels,
OwnerReferences: []metav1.OwnerReference{
{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController},
},
},
}
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)})
@@ -1142,7 +1216,7 @@ func TestDeletionTimestamp(t *testing.T) {
func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl, informers informers.SharedInformerFactory) {
c := fakeclientset.NewSimpleClientset(objs...)
fakePodControl = &controller.FakePodControl{}
manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas, 0)
manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas)
manager.podControl = fakePodControl
return manager, fakePodControl, informers
@@ -1372,7 +1446,7 @@ func TestReadyReplicas(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
labelMap := map[string]string{"foo": "bar"}
@@ -1414,7 +1488,7 @@ func TestAvailableReplicas(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
labelMap := map[string]string{"foo": "bar"}