update deployment lister

This commit is contained in:
deads2k
2016-10-04 13:11:07 -04:00
parent 617fa91264
commit 358a57d74a
10 changed files with 244 additions and 180 deletions

View File

@@ -67,27 +67,27 @@ type DeploymentController struct {
syncHandler func(dKey string) error
// A store of deployments, populated by the dController
dStore cache.StoreToDeploymentLister
dLister cache.StoreToDeploymentLister
// Watches changes to all deployments
dController *cache.Controller
// A store of ReplicaSets, populated by the rsController
rsStore cache.StoreToReplicaSetLister
rsLister cache.StoreToReplicaSetLister
// Watches changes to all ReplicaSets
rsController *cache.Controller
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
podLister cache.StoreToPodLister
// Watches changes to all pods
podController *cache.Controller
// dStoreSynced returns true if the Deployment store has been synced at least once.
// dListerSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dStoreSynced func() bool
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
dListerSynced func() bool
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsStoreSynced func() bool
// podStoreSynced returns true if the pod store has been synced at least once.
rsListerSynced func() bool
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
podListerSynced func() bool
// Deployments that need to be synced
queue workqueue.RateLimitingInterface
@@ -109,7 +109,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.dStore.Indexer, dc.dController = cache.NewIndexerInformer(
dc.dLister.Indexer, dc.dController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Extensions().Deployments(api.NamespaceAll).List(options)
@@ -129,7 +129,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.rsStore.Store, dc.rsController = cache.NewInformer(
dc.rsLister.Store, dc.rsController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options)
@@ -147,7 +147,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
},
)
dc.podStore.Indexer, dc.podController = cache.NewIndexerInformer(
dc.podLister.Indexer, dc.podController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Core().Pods(api.NamespaceAll).List(options)
@@ -167,9 +167,9 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
)
dc.syncHandler = dc.syncDeployment
dc.dStoreSynced = dc.dController.HasSynced
dc.rsStoreSynced = dc.rsController.HasSynced
dc.podStoreSynced = dc.podController.HasSynced
dc.dListerSynced = dc.dController.HasSynced
dc.rsListerSynced = dc.rsController.HasSynced
dc.podListerSynced = dc.podController.HasSynced
return dc
}
@@ -183,7 +183,7 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
// Wait for the rc and dc stores to sync before starting any work in this controller.
ready := make(chan struct{})
go dc.waitForSyncedStores(ready, stopCh)
go dc.waitForSyncedListers(ready, stopCh)
select {
case <-ready:
case <-stopCh:
@@ -199,10 +199,10 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
dc.queue.ShutDown()
}
func (dc *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) {
func (dc *DeploymentController) waitForSyncedListers(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for !dc.dStoreSynced() || !dc.rsStoreSynced() || !dc.podStoreSynced() {
for !dc.dListerSynced() || !dc.rsListerSynced() || !dc.podListerSynced() {
select {
case <-time.After(StoreSyncedPollPeriod):
case <-stopCh:
@@ -255,7 +255,7 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) {
// getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet.
func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment {
deployments, err := dc.dStore.GetDeploymentsForReplicaSet(rs)
deployments, err := dc.dLister.GetDeploymentsForReplicaSet(rs)
if err != nil || len(deployments) == 0 {
glog.V(4).Infof("Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing.", err, rs.Name)
return nil
@@ -268,7 +268,7 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic
sort.Sort(util.BySelectorLastUpdateTime(deployments))
glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
}
return &deployments[0]
return deployments[0]
}
// updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
@@ -328,7 +328,7 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
// getDeploymentForPod returns the deployment that manages the given Pod.
// If there are multiple deployments for a given Pod, only return the oldest one.
func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment {
deployments, err := dc.dStore.GetDeploymentsForPod(pod)
deployments, err := dc.dLister.GetDeploymentsForPod(pod)
if err != nil || len(deployments) == 0 {
glog.V(4).Infof("Error: %v. No deployment found for Pod %v, deployment controller will avoid syncing.", err, pod.Name)
return nil
@@ -338,7 +338,7 @@ func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.De
sort.Sort(util.BySelectorLastUpdateTime(deployments))
glog.Errorf("user error! more than one deployment is selecting pod %s/%s with labels: %#v, returning %s/%s", pod.Namespace, pod.Name, pod.Labels, deployments[0].Namespace, deployments[0].Name)
}
return &deployments[0]
return deployments[0]
}
// When a pod is created, ensure its controller syncs
@@ -478,7 +478,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := dc.dStore.Indexer.GetByKey(key)
obj, exists, err := dc.dLister.Indexer.GetByKey(key)
if err != nil {
glog.Errorf("Unable to retrieve deployment %v from store: %v", key, err)
return err
@@ -548,24 +548,27 @@ func (dc *DeploymentController) handleOverlap(d *extensions.Deployment) error {
if err != nil {
return fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
}
deployments, err := dc.dStore.Deployments(d.Namespace).List(labels.Everything())
deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything())
if err != nil {
return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err)
}
overlapping := false
for i := range deployments {
other := &deployments[i]
for _, other := range deployments {
if !selector.Empty() && selector.Matches(labels.Set(other.Spec.Template.Labels)) && d.UID != other.UID {
deploymentCopy, err := util.DeploymentDeepCopy(other)
if err != nil {
return err
}
overlapping = true
// We don't care if the overlapping annotation update failed or not (we don't make decision on it)
d, _ = dc.markDeploymentOverlap(d, other.Name)
other, _ = dc.markDeploymentOverlap(other, d.Name)
deploymentCopy, _ = dc.markDeploymentOverlap(deploymentCopy, d.Name)
// Skip syncing this one if older overlapping one is found
// TODO: figure out a better way to determine which deployment to skip,
// either with controller reference, or with validation.
// Using oldest active replica set to determine which deployment to skip wouldn't make much difference,
// since new replica set hasn't been created after selector update
if util.SelectorUpdatedBefore(other, d) {
if util.SelectorUpdatedBefore(deploymentCopy, d) {
return fmt.Errorf("found deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, other.Namespace, other.Name)
}
}

View File

@@ -157,9 +157,9 @@ type fixture struct {
client *fake.Clientset
// Objects to put in the store.
dStore []*exp.Deployment
rsStore []*exp.ReplicaSet
podStore []*api.Pod
dLister []*exp.Deployment
rsLister []*exp.ReplicaSet
podLister []*api.Pod
// Actions expected to happen on the client. Objects from here are also
// preloaded into NewSimpleFake.
@@ -200,16 +200,16 @@ func (f *fixture) run(deploymentName string) {
f.client = fake.NewSimpleClientset(f.objects...)
c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc)
c.eventRecorder = &record.FakeRecorder{}
c.rsStoreSynced = alwaysReady
c.podStoreSynced = alwaysReady
for _, d := range f.dStore {
c.dStore.Indexer.Add(d)
c.rsListerSynced = alwaysReady
c.podListerSynced = alwaysReady
for _, d := range f.dLister {
c.dLister.Indexer.Add(d)
}
for _, rs := range f.rsStore {
c.rsStore.Store.Add(rs)
for _, rs := range f.rsLister {
c.rsLister.Store.Add(rs)
}
for _, pod := range f.podStore {
c.podStore.Indexer.Add(pod)
for _, pod := range f.podLister {
c.podLister.Indexer.Add(pod)
}
err := c.syncDeployment(deploymentName)
@@ -240,7 +240,7 @@ func TestSyncDeploymentCreatesReplicaSet(t *testing.T) {
f := newFixture(t)
d := newDeployment(1, nil)
f.dStore = append(f.dStore, d)
f.dLister = append(f.dLister, d)
f.objects = append(f.objects, d)
rs := newReplicaSet(d, "deploymentrs-4186632231", 1)
@@ -258,7 +258,7 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) {
d := newDeployment(1, nil)
now := unversioned.Now()
d.DeletionTimestamp = &now
f.dStore = append(f.dStore, d)
f.dLister = append(f.dLister, d)
f.run(getKey(d, t))
}
@@ -269,13 +269,13 @@ func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)
controller.eventRecorder = &record.FakeRecorder{}
controller.rsStoreSynced = alwaysReady
controller.podStoreSynced = alwaysReady
controller.rsListerSynced = alwaysReady
controller.podListerSynced = alwaysReady
d := newDeployment(1, nil)
empty := unversioned.LabelSelector{}
d.Spec.Selector = &empty
controller.dStore.Indexer.Add(d)
controller.dLister.Indexer.Add(d)
// We expect the deployment controller to not take action here since it's configuration
// is invalid, even though no replicasets exist that match it's selector.
controller.syncDeployment(fmt.Sprintf("%s/%s", d.ObjectMeta.Namespace, d.ObjectMeta.Name))

View File

@@ -108,7 +108,7 @@ func (dc *DeploymentController) waitForInactiveReplicaSets(oldRSs []*extensions.
statusReplicas := rs.Status.Replicas
if err := wait.ExponentialBackoff(unversionedclient.DefaultRetry, func() (bool, error) {
replicaSet, err := dc.rsStore.ReplicaSets(rs.Namespace).Get(rs.Name)
replicaSet, err := dc.rsLister.ReplicaSets(rs.Namespace).Get(rs.Name)
if err != nil {
return false, err
}

View File

@@ -106,7 +106,7 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) {
rsList, err := deploymentutil.ListReplicaSets(deployment,
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
return dc.rsLister.ReplicaSets(namespace).List(options.LabelSelector)
})
if err != nil {
return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err)
@@ -174,7 +174,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
}
options := api.ListOptions{LabelSelector: selector}
pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
pods, err := dc.podLister.Pods(namespace).List(options.LabelSelector)
if err != nil {
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
}
@@ -228,7 +228,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) {
return deploymentutil.ListPods(deployment,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
pods, err := dc.podLister.Pods(namespace).List(options.LabelSelector)
result := api.PodList{Items: make([]api.Pod, 0, len(pods))}
for i := range pods {
result.Items = append(result.Items, *pods[i])

View File

@@ -326,10 +326,10 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) {
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)
controller.eventRecorder = &record.FakeRecorder{}
controller.rsStoreSynced = alwaysReady
controller.podStoreSynced = alwaysReady
controller.rsListerSynced = alwaysReady
controller.podListerSynced = alwaysReady
for _, rs := range test.oldRSs {
controller.rsStore.Add(rs)
controller.rsLister.Add(rs)
}
d := newDeployment(1, &tests[i].revisionHistoryLimit)

View File

@@ -826,12 +826,12 @@ func LastSelectorUpdate(d *extensions.Deployment) unversioned.Time {
// BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector,
// first using their creation timestamp and then their names as a tie breaker.
type BySelectorLastUpdateTime []extensions.Deployment
type BySelectorLastUpdateTime []*extensions.Deployment
func (o BySelectorLastUpdateTime) Len() int { return len(o) }
func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o BySelectorLastUpdateTime) Less(i, j int) bool {
ti, tj := LastSelectorUpdate(&o[i]), LastSelectorUpdate(&o[j])
ti, tj := LastSelectorUpdate(o[i]), LastSelectorUpdate(o[j])
if ti.Equal(tj) {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name