controller: wait for all pods to be deleted before Recreating

This commit is contained in:
Michail Kargakis
2016-12-09 17:16:00 +01:00
parent 31a5b4218d
commit 7ef3e6f7c9
12 changed files with 227 additions and 100 deletions

View File

@@ -117,6 +117,9 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})
dc.syncHandler = dc.syncDeployment
dc.dLister = dInformer.Lister()
@@ -167,12 +170,12 @@ func (dc *DeploymentController) deleteDeployment(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
d, ok = tombstone.Obj.(*extensions.Deployment)
if !ok {
glog.Errorf("Tombstone contained object that is not a Deployment %#v", obj)
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj))
return
}
}
@@ -202,7 +205,8 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic
// trying to clean up one of the controllers, for now we just return the older one
if len(deployments) > 1 {
sort.Sort(util.BySelectorLastUpdateTime(deployments))
glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
glog.V(4).Infof("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s",
rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
}
return deployments[0]
}
@@ -246,12 +250,12 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod))
return
}
rs, ok = tombstone.Obj.(*extensions.ReplicaSet)
if !ok {
glog.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod))
return
}
}
@@ -261,20 +265,48 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
}
}
// deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
func (dc *DeploymentController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the Pod
// changed labels the new deployment will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates pod", obj, FullDeploymentResyncPeriod))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v, could take up to %v before a deployment recreates/updates pods", obj, FullDeploymentResyncPeriod))
return
}
}
if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == extensions.RecreateDeploymentStrategyType {
podList, err := dc.listPods(d)
if err == nil && len(podList.Items) == 0 {
dc.enqueueDeployment(d)
}
}
}
func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", deployment, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.Add(key)
}
// enqueueAfter will enqueue a deployment after the provided amount of time in a secondary queue.
// checkProgressAfter will enqueue a deployment after the provided amount of time in a secondary queue.
// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued
// back to the main queue iff it has failed progressing.
func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) {
func (dc *DeploymentController) checkProgressAfter(deployment *extensions.Deployment, after time.Duration) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
@@ -284,6 +316,42 @@ func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment,
dc.progressQueue.AddAfter(key, after)
}
// getDeploymentForPod returns the deployment managing the given Pod.
func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *extensions.Deployment {
// Find the owning replica set
var rs *extensions.ReplicaSet
var err error
// Look at the owner reference
controllerRef := controller.GetControllerOf(pod.ObjectMeta)
if controllerRef != nil {
// Not a pod owned by a replica set.
if controllerRef.Kind != extensions.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
return nil
}
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
glog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err)
return nil
}
} else {
// Fallback to listing replica sets.
rss, err := dc.rsLister.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("Cannot list replica sets for pod %q: %v", pod.Name, err)
return nil
}
// TODO: Handle multiple replica sets gracefully
// For now we return the oldest replica set.
if len(rss) > 1 {
utilruntime.HandleError(fmt.Errorf("more than one ReplicaSet is selecting pod %q with labels: %+v", pod.Name, pod.Labels))
sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss))
}
rs = rss[0]
}
return dc.getDeploymentForReplicaSet(rs)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
@@ -332,7 +400,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
obj, exists, err := dc.dLister.Indexer.GetByKey(key)
if err != nil {
glog.Errorf("Unable to retrieve deployment %v from store: %v", key, err)
utilruntime.HandleError(fmt.Errorf("Unable to retrieve deployment %v from store: %v", key, err))
return err
}
if !exists {