mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1429 lines
		
	
	
		
			60 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1429 lines
		
	
	
		
			60 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2015 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package deployment
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"reflect"
 | 
						|
	"sort"
 | 
						|
	"strconv"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
	"k8s.io/kubernetes/pkg/api"
 | 
						|
	"k8s.io/kubernetes/pkg/api/annotations"
 | 
						|
	"k8s.io/kubernetes/pkg/api/errors"
 | 
						|
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
						|
	"k8s.io/kubernetes/pkg/apis/extensions"
 | 
						|
	"k8s.io/kubernetes/pkg/client/cache"
 | 
						|
	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
 | 
						|
	unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
 | 
						|
	"k8s.io/kubernetes/pkg/client/record"
 | 
						|
	"k8s.io/kubernetes/pkg/controller"
 | 
						|
	"k8s.io/kubernetes/pkg/controller/framework"
 | 
						|
	"k8s.io/kubernetes/pkg/runtime"
 | 
						|
	deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
 | 
						|
	utilerrors "k8s.io/kubernetes/pkg/util/errors"
 | 
						|
	"k8s.io/kubernetes/pkg/util/integer"
 | 
						|
	labelsutil "k8s.io/kubernetes/pkg/util/labels"
 | 
						|
	"k8s.io/kubernetes/pkg/util/metrics"
 | 
						|
	podutil "k8s.io/kubernetes/pkg/util/pod"
 | 
						|
	rsutil "k8s.io/kubernetes/pkg/util/replicaset"
 | 
						|
	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
 | 
						|
	"k8s.io/kubernetes/pkg/util/wait"
 | 
						|
	"k8s.io/kubernetes/pkg/util/workqueue"
 | 
						|
	"k8s.io/kubernetes/pkg/watch"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas
 | 
						|
	// of all deployments.
 | 
						|
	// This recomputation happens based on contents in the local caches.
 | 
						|
	FullDeploymentResyncPeriod = 30 * time.Second
 | 
						|
	// We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
 | 
						|
	// If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
 | 
						|
	StoreSyncedPollPeriod = 100 * time.Millisecond
 | 
						|
)
 | 
						|
 | 
						|
// DeploymentController is responsible for synchronizing Deployment objects stored
 | 
						|
// in the system with actual running replica sets and pods.
 | 
						|
type DeploymentController struct {
 | 
						|
	client        clientset.Interface
 | 
						|
	eventRecorder record.EventRecorder
 | 
						|
 | 
						|
	// To allow injection of syncDeployment for testing.
 | 
						|
	syncHandler func(dKey string) error
 | 
						|
 | 
						|
	// A store of deployments, populated by the dController
 | 
						|
	dStore cache.StoreToDeploymentLister
 | 
						|
	// Watches changes to all deployments
 | 
						|
	dController *framework.Controller
 | 
						|
	// A store of ReplicaSets, populated by the rsController
 | 
						|
	rsStore cache.StoreToReplicaSetLister
 | 
						|
	// Watches changes to all ReplicaSets
 | 
						|
	rsController *framework.Controller
 | 
						|
	// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
 | 
						|
	// Added as a member to the struct to allow injection for testing.
 | 
						|
	rsStoreSynced func() bool
 | 
						|
	// A store of pods, populated by the podController
 | 
						|
	podStore cache.StoreToPodLister
 | 
						|
	// Watches changes to all pods
 | 
						|
	podController *framework.Controller
 | 
						|
	// podStoreSynced returns true if the pod store has been synced at least once.
 | 
						|
	// Added as a member to the struct to allow injection for testing.
 | 
						|
	podStoreSynced func() bool
 | 
						|
 | 
						|
	// Deployments that need to be synced
 | 
						|
	queue *workqueue.Type
 | 
						|
}
 | 
						|
 | 
						|
// NewDeploymentController creates a new DeploymentController.
 | 
						|
func NewDeploymentController(client clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController {
 | 
						|
	eventBroadcaster := record.NewBroadcaster()
 | 
						|
	eventBroadcaster.StartLogging(glog.Infof)
 | 
						|
	// TODO: remove the wrapper when every clients have moved to use the clientset.
 | 
						|
	eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: client.Core().Events("")})
 | 
						|
 | 
						|
	if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil {
 | 
						|
		metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.Core().GetRESTClient().GetRateLimiter())
 | 
						|
	}
 | 
						|
	dc := &DeploymentController{
 | 
						|
		client:        client,
 | 
						|
		eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
 | 
						|
		queue:         workqueue.New(),
 | 
						|
	}
 | 
						|
 | 
						|
	dc.dStore.Store, dc.dController = framework.NewInformer(
 | 
						|
		&cache.ListWatch{
 | 
						|
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
						|
				return dc.client.Extensions().Deployments(api.NamespaceAll).List(options)
 | 
						|
			},
 | 
						|
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
						|
				return dc.client.Extensions().Deployments(api.NamespaceAll).Watch(options)
 | 
						|
			},
 | 
						|
		},
 | 
						|
		&extensions.Deployment{},
 | 
						|
		FullDeploymentResyncPeriod,
 | 
						|
		framework.ResourceEventHandlerFuncs{
 | 
						|
			AddFunc:    dc.addDeploymentNotification,
 | 
						|
			UpdateFunc: dc.updateDeploymentNotification,
 | 
						|
			// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
 | 
						|
			DeleteFunc: dc.deleteDeploymentNotification,
 | 
						|
		},
 | 
						|
	)
 | 
						|
 | 
						|
	dc.rsStore.Store, dc.rsController = framework.NewInformer(
 | 
						|
		&cache.ListWatch{
 | 
						|
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
						|
				return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options)
 | 
						|
			},
 | 
						|
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
						|
				return dc.client.Extensions().ReplicaSets(api.NamespaceAll).Watch(options)
 | 
						|
			},
 | 
						|
		},
 | 
						|
		&extensions.ReplicaSet{},
 | 
						|
		resyncPeriod(),
 | 
						|
		framework.ResourceEventHandlerFuncs{
 | 
						|
			AddFunc:    dc.addReplicaSet,
 | 
						|
			UpdateFunc: dc.updateReplicaSet,
 | 
						|
			DeleteFunc: dc.deleteReplicaSet,
 | 
						|
		},
 | 
						|
	)
 | 
						|
 | 
						|
	dc.podStore.Indexer, dc.podController = framework.NewIndexerInformer(
 | 
						|
		&cache.ListWatch{
 | 
						|
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
 | 
						|
				return dc.client.Core().Pods(api.NamespaceAll).List(options)
 | 
						|
			},
 | 
						|
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
						|
				return dc.client.Core().Pods(api.NamespaceAll).Watch(options)
 | 
						|
			},
 | 
						|
		},
 | 
						|
		&api.Pod{},
 | 
						|
		resyncPeriod(),
 | 
						|
		framework.ResourceEventHandlerFuncs{
 | 
						|
			AddFunc:    dc.addPod,
 | 
						|
			UpdateFunc: dc.updatePod,
 | 
						|
			DeleteFunc: dc.deletePod,
 | 
						|
		},
 | 
						|
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
 | 
						|
	)
 | 
						|
 | 
						|
	dc.syncHandler = dc.syncDeployment
 | 
						|
	dc.rsStoreSynced = dc.rsController.HasSynced
 | 
						|
	dc.podStoreSynced = dc.podController.HasSynced
 | 
						|
	return dc
 | 
						|
}
 | 
						|
 | 
						|
// Run begins watching and syncing.
 | 
						|
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
 | 
						|
	defer utilruntime.HandleCrash()
 | 
						|
	go dc.dController.Run(stopCh)
 | 
						|
	go dc.rsController.Run(stopCh)
 | 
						|
	go dc.podController.Run(stopCh)
 | 
						|
	for i := 0; i < workers; i++ {
 | 
						|
		go wait.Until(dc.worker, time.Second, stopCh)
 | 
						|
	}
 | 
						|
	<-stopCh
 | 
						|
	glog.Infof("Shutting down deployment controller")
 | 
						|
	dc.queue.ShutDown()
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
 | 
						|
	d := obj.(*extensions.Deployment)
 | 
						|
	glog.V(4).Infof("Adding deployment %s", d.Name)
 | 
						|
	dc.enqueueDeployment(d)
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) updateDeploymentNotification(old, cur interface{}) {
 | 
						|
	oldD := old.(*extensions.Deployment)
 | 
						|
	glog.V(4).Infof("Updating deployment %s", oldD.Name)
 | 
						|
	// Resync on deployment object relist.
 | 
						|
	dc.enqueueDeployment(cur.(*extensions.Deployment))
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) deleteDeploymentNotification(obj interface{}) {
 | 
						|
	d, ok := obj.(*extensions.Deployment)
 | 
						|
	if !ok {
 | 
						|
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | 
						|
		if !ok {
 | 
						|
			glog.Errorf("Couldn't get object from tombstone %+v", obj)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		d, ok = tombstone.Obj.(*extensions.Deployment)
 | 
						|
		if !ok {
 | 
						|
			glog.Errorf("Tombstone contained object that is not a Deployment %+v", obj)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("Deleting deployment %s", d.Name)
 | 
						|
	dc.enqueueDeployment(d)
 | 
						|
}
 | 
						|
 | 
						|
// addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
 | 
						|
func (dc *DeploymentController) addReplicaSet(obj interface{}) {
 | 
						|
	rs := obj.(*extensions.ReplicaSet)
 | 
						|
	glog.V(4).Infof("ReplicaSet %s added.", rs.Name)
 | 
						|
	if d := dc.getDeploymentForReplicaSet(rs); d != nil {
 | 
						|
		dc.enqueueDeployment(d)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet.
 | 
						|
// TODO: Surface that we are ignoring multiple deployments for a given ReplicaSet.
 | 
						|
func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment {
 | 
						|
	deployments, err := dc.dStore.GetDeploymentsForReplicaSet(rs)
 | 
						|
	if err != nil || len(deployments) == 0 {
 | 
						|
		glog.V(4).Infof("Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing.", err, rs.Name)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	// Because all ReplicaSet's belonging to a deployment should have a unique label key,
 | 
						|
	// there should never be more than one deployment returned by the above method.
 | 
						|
	// If that happens we should probably dynamically repair the situation by ultimately
 | 
						|
	// trying to clean up one of the controllers, for now we just return one of the two,
 | 
						|
	// likely randomly.
 | 
						|
	return &deployments[0]
 | 
						|
}
 | 
						|
 | 
						|
// updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
 | 
						|
// is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
 | 
						|
// awaken both the old and new deployments. old and cur must be *extensions.ReplicaSet
 | 
						|
// types.
 | 
						|
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
 | 
						|
	if api.Semantic.DeepEqual(old, cur) {
 | 
						|
		// A periodic relist will send update events for all known controllers.
 | 
						|
		return
 | 
						|
	}
 | 
						|
	// TODO: Write a unittest for this case
 | 
						|
	curRS := cur.(*extensions.ReplicaSet)
 | 
						|
	glog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
 | 
						|
	if d := dc.getDeploymentForReplicaSet(curRS); d != nil {
 | 
						|
		dc.enqueueDeployment(d)
 | 
						|
	}
 | 
						|
	// A number of things could affect the old deployment: labels changing,
 | 
						|
	// pod template changing, etc.
 | 
						|
	oldRS := old.(*extensions.ReplicaSet)
 | 
						|
	if !api.Semantic.DeepEqual(oldRS, curRS) {
 | 
						|
		if oldD := dc.getDeploymentForReplicaSet(oldRS); oldD != nil {
 | 
						|
			dc.enqueueDeployment(oldD)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
 | 
						|
// the ReplicaSet is deleted. obj could be an *extensions.ReplicaSet, or
 | 
						|
// a DeletionFinalStateUnknown marker item.
 | 
						|
func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
 | 
						|
	rs, ok := obj.(*extensions.ReplicaSet)
 | 
						|
 | 
						|
	// When a delete is dropped, the relist will notice a pod in the store not
 | 
						|
	// in the list, leading to the insertion of a tombstone object which contains
 | 
						|
	// the deleted key/value. Note that this value might be stale. If the ReplicaSet
 | 
						|
	// changed labels the new deployment will not be woken up till the periodic resync.
 | 
						|
	if !ok {
 | 
						|
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | 
						|
		if !ok {
 | 
						|
			glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		rs, ok = tombstone.Obj.(*extensions.ReplicaSet)
 | 
						|
		if !ok {
 | 
						|
			glog.Errorf("Tombstone contained object that is not a ReplicaSet %+v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
 | 
						|
	if d := dc.getDeploymentForReplicaSet(rs); d != nil {
 | 
						|
		dc.enqueueDeployment(d)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// getDeploymentForPod returns the deployment managing the ReplicaSet that manages the given Pod.
 | 
						|
// TODO: Surface that we are ignoring multiple deployments for a given Pod.
 | 
						|
func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment {
 | 
						|
	rss, err := dc.rsStore.GetPodReplicaSets(pod)
 | 
						|
	if err != nil {
 | 
						|
		glog.V(4).Infof("Error: %v. No ReplicaSets found for pod %v, deployment controller will avoid syncing.", err, pod.Name)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	for _, rs := range rss {
 | 
						|
		deployments, err := dc.dStore.GetDeploymentsForReplicaSet(&rs)
 | 
						|
		if err == nil && len(deployments) > 0 {
 | 
						|
			return &deployments[0]
 | 
						|
		}
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("No deployments found for pod %v, deployment controller will avoid syncing.", pod.Name)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// When a pod is created, ensure its controller syncs
 | 
						|
func (dc *DeploymentController) addPod(obj interface{}) {
 | 
						|
	pod, ok := obj.(*api.Pod)
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("Pod %s created: %+v.", pod.Name, pod)
 | 
						|
	if d := dc.getDeploymentForPod(pod); d != nil {
 | 
						|
		dc.enqueueDeployment(d)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// updatePod figures out what deployment(s) manage the ReplicaSet that manages the Pod when the Pod
 | 
						|
// is updated and wake them up. If anything of the Pods have changed, we need to awaken both
 | 
						|
// the old and new deployments. old and cur must be *api.Pod types.
 | 
						|
func (dc *DeploymentController) updatePod(old, cur interface{}) {
 | 
						|
	if api.Semantic.DeepEqual(old, cur) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	curPod := cur.(*api.Pod)
 | 
						|
	oldPod := old.(*api.Pod)
 | 
						|
	glog.V(4).Infof("Pod %s updated %#v -> %#v.", curPod.Name, oldPod, curPod)
 | 
						|
	if d := dc.getDeploymentForPod(curPod); d != nil {
 | 
						|
		dc.enqueueDeployment(d)
 | 
						|
	}
 | 
						|
	if !api.Semantic.DeepEqual(oldPod, curPod) {
 | 
						|
		if oldD := dc.getDeploymentForPod(oldPod); oldD != nil {
 | 
						|
			dc.enqueueDeployment(oldD)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// When a pod is deleted, ensure its controller syncs.
 | 
						|
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
 | 
						|
func (dc *DeploymentController) deletePod(obj interface{}) {
 | 
						|
	pod, ok := obj.(*api.Pod)
 | 
						|
	// When a delete is dropped, the relist will notice a pod in the store not
 | 
						|
	// in the list, leading to the insertion of a tombstone object which contains
 | 
						|
	// the deleted key/value. Note that this value might be stale. If the pod
 | 
						|
	// changed labels the new ReplicaSet will not be woken up till the periodic
 | 
						|
	// resync.
 | 
						|
	if !ok {
 | 
						|
		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | 
						|
		if !ok {
 | 
						|
			glog.Errorf("Couldn't get object from tombstone %+v", obj)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		pod, ok = tombstone.Obj.(*api.Pod)
 | 
						|
		if !ok {
 | 
						|
			glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("Pod %s deleted: %+v.", pod.Name, pod)
 | 
						|
	if d := dc.getDeploymentForPod(pod); d != nil {
 | 
						|
		dc.enqueueDeployment(d)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deployment) {
 | 
						|
	key, err := controller.KeyFunc(deployment)
 | 
						|
	if err != nil {
 | 
						|
		glog.Errorf("Couldn't get key for object %+v: %v", deployment, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO: Handle overlapping deployments better. Either disallow them at admission time or
 | 
						|
	// deterministically avoid syncing deployments that fight over ReplicaSet's. Currently, we
 | 
						|
	// only ensure that the same deployment is synced for a given ReplicaSet. When we
 | 
						|
	// periodically relist all deployments there will still be some ReplicaSet instability. One
 | 
						|
	//  way to handle this is by querying the store for all deployments that this deployment
 | 
						|
	// overlaps, as well as all deployments that overlap this deployments, and sorting them.
 | 
						|
	dc.queue.Add(key)
 | 
						|
}
 | 
						|
 | 
						|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
 | 
						|
// It enforces that the syncHandler is never invoked concurrently with the same key.
 | 
						|
func (dc *DeploymentController) worker() {
 | 
						|
	for {
 | 
						|
		func() {
 | 
						|
			key, quit := dc.queue.Get()
 | 
						|
			if quit {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			defer dc.queue.Done(key)
 | 
						|
			err := dc.syncHandler(key.(string))
 | 
						|
			if err != nil {
 | 
						|
				glog.Errorf("Error syncing deployment %v: %v", key, err)
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// syncDeployment will sync the deployment with the given key.
 | 
						|
// This function is not meant to be invoked concurrently with the same key.
 | 
						|
func (dc *DeploymentController) syncDeployment(key string) error {
 | 
						|
	startTime := time.Now()
 | 
						|
	defer func() {
 | 
						|
		glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
 | 
						|
	}()
 | 
						|
 | 
						|
	if !dc.rsStoreSynced() || !dc.podStoreSynced() {
 | 
						|
		// Sleep so we give the replica set / pod reflector goroutine a chance to run.
 | 
						|
		time.Sleep(StoreSyncedPollPeriod)
 | 
						|
		glog.Infof("Waiting for replica set / pod controller to sync, requeuing deployment %s", key)
 | 
						|
		dc.queue.Add(key)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	obj, exists, err := dc.dStore.Store.GetByKey(key)
 | 
						|
	if err != nil {
 | 
						|
		glog.Infof("Unable to retrieve deployment %v from store: %v", key, err)
 | 
						|
		dc.queue.Add(key)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if !exists {
 | 
						|
		glog.Infof("Deployment has been deleted %v", key)
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	d := obj.(*extensions.Deployment)
 | 
						|
	everything := unversioned.LabelSelector{}
 | 
						|
	if reflect.DeepEqual(d.Spec.Selector, &everything) {
 | 
						|
		dc.eventRecorder.Eventf(d, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if d.Spec.Paused {
 | 
						|
		return dc.sync(d)
 | 
						|
	}
 | 
						|
 | 
						|
	if d.Spec.RollbackTo != nil {
 | 
						|
		revision := d.Spec.RollbackTo.Revision
 | 
						|
		if _, err = dc.rollback(d, &revision); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if dc.isScalingEvent(d) {
 | 
						|
		return dc.sync(d)
 | 
						|
	}
 | 
						|
 | 
						|
	switch d.Spec.Strategy.Type {
 | 
						|
	case extensions.RecreateDeploymentStrategyType:
 | 
						|
		return dc.rolloutRecreate(d)
 | 
						|
	case extensions.RollingUpdateDeploymentStrategyType:
 | 
						|
		return dc.rolloutRolling(d)
 | 
						|
	}
 | 
						|
	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
 | 
						|
}
 | 
						|
 | 
						|
// sync is responsible for reconciling deployments on scaling events or when they
 | 
						|
// are paused.
 | 
						|
func (dc *DeploymentController) sync(deployment *extensions.Deployment) error {
 | 
						|
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if err := dc.scale(deployment, newRS, oldRSs); err != nil {
 | 
						|
		// If we get an error while trying to scale, the deployment will be requeued
 | 
						|
		// so we can abort this resync
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	dc.cleanupDeployment(oldRSs, deployment)
 | 
						|
 | 
						|
	allRSs := append(oldRSs, newRS)
 | 
						|
	return dc.syncDeploymentStatus(allRSs, newRS, deployment)
 | 
						|
}
 | 
						|
 | 
						|
// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
 | 
						|
// of the new replica set and scaling down can decrease the sizes of the old ones, both of which would
 | 
						|
// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
 | 
						|
// replicas in the event of a problem with the rolled out template. Should run only on scaling events or
 | 
						|
// when a deployment is paused and not during the normal rollout process.
 | 
						|
func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error {
 | 
						|
	// If there is only one active replica set then we should scale that up to the full count of the
 | 
						|
	// deployment. If there is no active replica set, then we should scale up the newest replica set.
 | 
						|
	if activeOrLatest := findActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
 | 
						|
		if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, deployment.Spec.Replicas, deployment)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// If the new replica set is saturated, old replica sets should be fully scaled down.
 | 
						|
	// This case handles replica set adoption during a saturated new replica set.
 | 
						|
	if deploymentutil.IsSaturated(deployment, newRS) {
 | 
						|
		for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
 | 
						|
			if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// There are old replica sets with pods and the new replica set is not saturated.
 | 
						|
	// We need to proportionally scale all replica sets (new and old) in case of a
 | 
						|
	// rolling deployment.
 | 
						|
	if deploymentutil.IsRollingUpdate(deployment) {
 | 
						|
		allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
 | 
						|
		allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
 | 
						|
 | 
						|
		allowedSize := int32(0)
 | 
						|
		if deployment.Spec.Replicas > 0 {
 | 
						|
			allowedSize = deployment.Spec.Replicas + maxSurge(*deployment)
 | 
						|
		}
 | 
						|
 | 
						|
		// Number of additional replicas that can be either added or removed from the total
 | 
						|
		// replicas count. These replicas should be distributed proportionally to the active
 | 
						|
		// replica sets.
 | 
						|
		deploymentReplicasToAdd := allowedSize - allRSsReplicas
 | 
						|
 | 
						|
		// The additional replicas should be distributed proportionally amongst the active
 | 
						|
		// replica sets from the larger to the smaller in size replica set. Scaling direction
 | 
						|
		// drives what happens in case we are trying to scale replica sets of the same size.
 | 
						|
		// In such a case when scaling up, we should scale up newer replica sets first, and
 | 
						|
		// when scaling down, we should scale down older replica sets first.
 | 
						|
		scalingOperation := "up"
 | 
						|
		switch {
 | 
						|
		case deploymentReplicasToAdd > 0:
 | 
						|
			sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
 | 
						|
 | 
						|
		case deploymentReplicasToAdd < 0:
 | 
						|
			sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
 | 
						|
			scalingOperation = "down"
 | 
						|
 | 
						|
		default: /* deploymentReplicasToAdd == 0 */
 | 
						|
			// Nothing to add.
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		// Iterate over all active replica sets and estimate proportions for each of them.
 | 
						|
		// The absolute value of deploymentReplicasAdded should never exceed the absolute
 | 
						|
		// value of deploymentReplicasToAdd.
 | 
						|
		deploymentReplicasAdded := int32(0)
 | 
						|
		for i := range allRSs {
 | 
						|
			rs := allRSs[i]
 | 
						|
 | 
						|
			proportion := getProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
 | 
						|
 | 
						|
			rs.Spec.Replicas += proportion
 | 
						|
			deploymentReplicasAdded += proportion
 | 
						|
		}
 | 
						|
 | 
						|
		// Update all replica sets
 | 
						|
		for i := range allRSs {
 | 
						|
			rs := allRSs[i]
 | 
						|
 | 
						|
			// Add/remove any leftovers to the largest replica set.
 | 
						|
			if i == 0 {
 | 
						|
				leftover := deploymentReplicasToAdd - deploymentReplicasAdded
 | 
						|
				rs.Spec.Replicas += leftover
 | 
						|
				if rs.Spec.Replicas < 0 {
 | 
						|
					rs.Spec.Replicas = 0
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil {
 | 
						|
				// Return as soon as we fail, the deployment is requeued
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
 | 
						|
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
 | 
						|
	newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	allRSs := append(allOldRSs, newRS)
 | 
						|
	// If rollback revision is 0, rollback to the last revision
 | 
						|
	if *toRevision == 0 {
 | 
						|
		if *toRevision = lastRevision(allRSs); *toRevision == 0 {
 | 
						|
			// If we still can't find the last revision, gives up rollback
 | 
						|
			dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
 | 
						|
			// Gives up rollback
 | 
						|
			return dc.updateDeploymentAndClearRollbackTo(deployment)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	for _, rs := range allRSs {
 | 
						|
		v, err := deploymentutil.Revision(rs)
 | 
						|
		if err != nil {
 | 
						|
			glog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if v == *toRevision {
 | 
						|
			glog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v)
 | 
						|
			// rollback by copying podTemplate.Spec from the replica set, and increment revision number by 1
 | 
						|
			// no-op if the the spec matches current deployment's podTemplate.Spec
 | 
						|
			deployment, performedRollback, err := dc.rollbackToTemplate(deployment, rs)
 | 
						|
			if performedRollback && err == nil {
 | 
						|
				dc.emitRollbackNormalEvent(deployment, fmt.Sprintf("Rolled back deployment %q to revision %d", deployment.Name, *toRevision))
 | 
						|
			}
 | 
						|
			return deployment, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.")
 | 
						|
	// Gives up rollback
 | 
						|
	return dc.updateDeploymentAndClearRollbackTo(deployment)
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) emitRollbackWarningEvent(deployment *extensions.Deployment, reason, message string) {
 | 
						|
	dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, reason, message)
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) emitRollbackNormalEvent(deployment *extensions.Deployment, message string) {
 | 
						|
	dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, deploymentutil.RollbackDone, message)
 | 
						|
}
 | 
						|
 | 
						|
// updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment
 | 
						|
func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *extensions.Deployment) (*extensions.Deployment, error) {
 | 
						|
	glog.V(4).Infof("Cleans up rollbackTo of deployment %s", deployment.Name)
 | 
						|
	deployment.Spec.RollbackTo = nil
 | 
						|
	return dc.updateDeployment(deployment)
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error {
 | 
						|
	// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down
 | 
						|
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	allRSs := append(oldRSs, newRS)
 | 
						|
 | 
						|
	// scale down old replica sets
 | 
						|
	scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(controller.FilterActiveReplicaSets(oldRSs), deployment)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if scaledDown {
 | 
						|
		// Update DeploymentStatus
 | 
						|
		return dc.updateDeploymentStatus(allRSs, newRS, deployment)
 | 
						|
	}
 | 
						|
 | 
						|
	// If we need to create a new RS, create it now
 | 
						|
	// TODO: Create a new RS without re-listing all RSs.
 | 
						|
	if newRS == nil {
 | 
						|
		newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(deployment, true)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		allRSs = append(oldRSs, newRS)
 | 
						|
	}
 | 
						|
 | 
						|
	// scale up new replica set
 | 
						|
	scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if scaledUp {
 | 
						|
		// Update DeploymentStatus
 | 
						|
		return dc.updateDeploymentStatus(allRSs, newRS, deployment)
 | 
						|
	}
 | 
						|
 | 
						|
	dc.cleanupDeployment(oldRSs, deployment)
 | 
						|
 | 
						|
	// Sync deployment status
 | 
						|
	return dc.syncDeploymentStatus(allRSs, newRS, deployment)
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error {
 | 
						|
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	allRSs := append(oldRSs, newRS)
 | 
						|
 | 
						|
	// Scale up, if we can.
 | 
						|
	scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if scaledUp {
 | 
						|
		// Update DeploymentStatus
 | 
						|
		return dc.updateDeploymentStatus(allRSs, newRS, deployment)
 | 
						|
	}
 | 
						|
 | 
						|
	// Scale down, if we can.
 | 
						|
	scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if scaledDown {
 | 
						|
		// Update DeploymentStatus
 | 
						|
		return dc.updateDeploymentStatus(allRSs, newRS, deployment)
 | 
						|
	}
 | 
						|
 | 
						|
	dc.cleanupDeployment(oldRSs, deployment)
 | 
						|
 | 
						|
	// Sync deployment status
 | 
						|
	return dc.syncDeploymentStatus(allRSs, newRS, deployment)
 | 
						|
}
 | 
						|
 | 
						|
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
 | 
						|
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error {
 | 
						|
	newStatus, err := dc.calculateStatus(allRSs, newRS, d)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if !reflect.DeepEqual(d.Status, newStatus) {
 | 
						|
		return dc.updateDeploymentStatus(allRSs, newRS, d)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated.
 | 
						|
// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV).
 | 
						|
// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
 | 
						|
//    only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
 | 
						|
// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
 | 
						|
// Note that currently the deployment controller is using caches to avoid querying the server for reads.
 | 
						|
// This may lead to stale reads of replica sets, thus incorrect deployment status.
 | 
						|
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
 | 
						|
	// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
 | 
						|
	rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
 | 
						|
	}
 | 
						|
	_, allOldRSs, err := deploymentutil.FindOldReplicaSets(deployment, rsList, podList)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Calculate the max revision number among all old RSes
 | 
						|
	maxOldV := maxRevision(allOldRSs)
 | 
						|
 | 
						|
	// Get new replica set with the updated revision number
 | 
						|
	newRS, err := dc.getNewReplicaSet(deployment, rsList, maxOldV, allOldRSs, createIfNotExisted)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Sync deployment's revision number with new replica set
 | 
						|
	if newRS != nil && newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 &&
 | 
						|
		(deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) {
 | 
						|
		if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil {
 | 
						|
			glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return newRS, allOldRSs, nil
 | 
						|
}
 | 
						|
 | 
						|
func maxRevision(allRSs []*extensions.ReplicaSet) int64 {
 | 
						|
	max := int64(0)
 | 
						|
	for _, rs := range allRSs {
 | 
						|
		if v, err := deploymentutil.Revision(rs); err != nil {
 | 
						|
			// Skip the replica sets when it failed to parse their revision information
 | 
						|
			glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs)
 | 
						|
		} else if v > max {
 | 
						|
			max = v
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return max
 | 
						|
}
 | 
						|
 | 
						|
// lastRevision finds the second max revision number in all replica sets (the last revision)
 | 
						|
func lastRevision(allRSs []*extensions.ReplicaSet) int64 {
 | 
						|
	max, secMax := int64(0), int64(0)
 | 
						|
	for _, rs := range allRSs {
 | 
						|
		if v, err := deploymentutil.Revision(rs); err != nil {
 | 
						|
			// Skip the replica sets when it failed to parse their revision information
 | 
						|
			glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs)
 | 
						|
		} else if v >= max {
 | 
						|
			secMax = max
 | 
						|
			max = v
 | 
						|
		} else if v > secMax {
 | 
						|
			secMax = v
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return secMax
 | 
						|
}
 | 
						|
 | 
						|
// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet.
 | 
						|
// 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's).
 | 
						|
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
 | 
						|
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
 | 
						|
// Note that the pod-template-hash will be added to adopted RSes and pods.
 | 
						|
func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
 | 
						|
	// Calculate revision number for this new replica set
 | 
						|
	newRevision := strconv.FormatInt(maxOldRevision+1, 10)
 | 
						|
 | 
						|
	existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	} else if existingNewRS != nil {
 | 
						|
		// Set existing new replica set's annotation
 | 
						|
		if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) {
 | 
						|
			return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS)
 | 
						|
		}
 | 
						|
		return existingNewRS, nil
 | 
						|
	}
 | 
						|
 | 
						|
	if !createIfNotExisted {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// new ReplicaSet does not exist, create one.
 | 
						|
	namespace := deployment.ObjectMeta.Namespace
 | 
						|
	podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template)
 | 
						|
	newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment)
 | 
						|
	// Add podTemplateHash label to selector.
 | 
						|
	newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
 | 
						|
 | 
						|
	// Create new ReplicaSet
 | 
						|
	newRS := extensions.ReplicaSet{
 | 
						|
		ObjectMeta: api.ObjectMeta{
 | 
						|
			// Make the name deterministic, to ensure idempotence
 | 
						|
			Name:      deployment.Name + "-" + fmt.Sprintf("%d", podTemplateSpecHash),
 | 
						|
			Namespace: namespace,
 | 
						|
		},
 | 
						|
		Spec: extensions.ReplicaSetSpec{
 | 
						|
			Replicas: 0,
 | 
						|
			Selector: newRSSelector,
 | 
						|
			Template: newRSTemplate,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	allRSs := append(oldRSs, &newRS)
 | 
						|
	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	newRS.Spec.Replicas = newReplicasCount
 | 
						|
	// Set new replica set's annotation
 | 
						|
	setNewReplicaSetAnnotations(deployment, &newRS, newRevision, false)
 | 
						|
	createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
 | 
						|
	if err != nil {
 | 
						|
		dc.enqueueDeployment(deployment)
 | 
						|
		return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err)
 | 
						|
	}
 | 
						|
	if newReplicasCount > 0 {
 | 
						|
		dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
 | 
						|
	}
 | 
						|
 | 
						|
	return createdRS, dc.updateDeploymentRevision(deployment, newRevision)
 | 
						|
}
 | 
						|
 | 
						|
// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced.
 | 
						|
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) {
 | 
						|
	rsList, err := deploymentutil.ListReplicaSets(deployment,
 | 
						|
		func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
 | 
						|
			return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
 | 
						|
		})
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err)
 | 
						|
	}
 | 
						|
	syncedRSList := []extensions.ReplicaSet{}
 | 
						|
	for _, rs := range rsList {
 | 
						|
		// Add pod-template-hash information if it's not in the RS.
 | 
						|
		// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
 | 
						|
		// that aren't constrained by the pod-template-hash.
 | 
						|
		syncedRS, err := dc.addHashKeyToRSAndPods(rs)
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, err
 | 
						|
		}
 | 
						|
		syncedRSList = append(syncedRSList, *syncedRS)
 | 
						|
	}
 | 
						|
	syncedPodList, err := dc.listPods(deployment)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
	return syncedRSList, syncedPodList, nil
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) {
 | 
						|
	return deploymentutil.ListPods(deployment,
 | 
						|
		func(namespace string, options api.ListOptions) (*api.PodList, error) {
 | 
						|
			podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
 | 
						|
			return &podList, err
 | 
						|
		})
 | 
						|
}
 | 
						|
 | 
						|
// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps:
 | 
						|
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
 | 
						|
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
 | 
						|
// 3. Add hash label to the rs's label and selector
 | 
						|
func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) {
 | 
						|
	updatedRS = &rs
 | 
						|
	// If the rs already has the new hash label in its selector, it's done syncing
 | 
						|
	if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	namespace := rs.Namespace
 | 
						|
	hash := rsutil.GetPodTemplateSpecHash(rs)
 | 
						|
	rsUpdated := false
 | 
						|
	// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
 | 
						|
	updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS,
 | 
						|
		func(updated *extensions.ReplicaSet) error {
 | 
						|
			// Precondition: the RS doesn't contain the new hash in its pod template label.
 | 
						|
			if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
 | 
						|
				return utilerrors.ErrPreconditionViolated
 | 
						|
			}
 | 
						|
			updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
 | 
						|
			return nil
 | 
						|
		})
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
 | 
						|
	}
 | 
						|
	if !rsUpdated {
 | 
						|
		// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
 | 
						|
		// Return here and retry in the next sync loop.
 | 
						|
		return &rs, nil
 | 
						|
	}
 | 
						|
	// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
 | 
						|
	if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
 | 
						|
		if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
 | 
						|
			return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
 | 
						|
 | 
						|
	// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
 | 
						|
	selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
 | 
						|
	}
 | 
						|
	options := api.ListOptions{LabelSelector: selector}
 | 
						|
	podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
 | 
						|
	}
 | 
						|
	allPodsLabeled := false
 | 
						|
	if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil {
 | 
						|
		return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
 | 
						|
	}
 | 
						|
	// If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error.
 | 
						|
	// Return here and retry in the next sync loop.
 | 
						|
	if !allPodsLabeled {
 | 
						|
		return updatedRS, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// We need to wait for the replicaset controller to observe the pods being
 | 
						|
	// labeled with pod template hash. Because previously we've called
 | 
						|
	// WaitForReplicaSetUpdated, the replicaset controller should have dropped
 | 
						|
	// FullyLabeledReplicas to 0 already, we only need to wait it to increase
 | 
						|
	// back to the number of replicas in the spec.
 | 
						|
	if err = deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
 | 
						|
		return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
 | 
						|
	}
 | 
						|
 | 
						|
	// 3. Update rs label and selector to include the new hash label
 | 
						|
	// Copy the old selector, so that we can scrub out any orphaned pods
 | 
						|
	if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS,
 | 
						|
		func(updated *extensions.ReplicaSet) error {
 | 
						|
			// Precondition: the RS doesn't contain the new hash in its label or selector.
 | 
						|
			if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
 | 
						|
				return utilerrors.ErrPreconditionViolated
 | 
						|
			}
 | 
						|
			updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
 | 
						|
			updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
 | 
						|
			return nil
 | 
						|
		}); err != nil {
 | 
						|
		return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
 | 
						|
	}
 | 
						|
	if rsUpdated {
 | 
						|
		glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
 | 
						|
	}
 | 
						|
	// If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet.
 | 
						|
 | 
						|
	// TODO: look for orphaned pods and label them in the background somewhere else periodically
 | 
						|
 | 
						|
	return updatedRS, nil
 | 
						|
}
 | 
						|
 | 
						|
// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
 | 
						|
// copying required deployment annotations to it; it returns true if replica set's annotation is changed.
 | 
						|
func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool {
 | 
						|
	// First, copy deployment's annotations (except for apply and revision annotations)
 | 
						|
	annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS)
 | 
						|
	// Then, update replica set's revision annotation
 | 
						|
	if newRS.Annotations == nil {
 | 
						|
		newRS.Annotations = make(map[string]string)
 | 
						|
	}
 | 
						|
	// The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number
 | 
						|
	// of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and
 | 
						|
	// newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision.
 | 
						|
	if newRS.Annotations[deploymentutil.RevisionAnnotation] < newRevision {
 | 
						|
		newRS.Annotations[deploymentutil.RevisionAnnotation] = newRevision
 | 
						|
		annotationChanged = true
 | 
						|
		glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision)
 | 
						|
	}
 | 
						|
	if !exists && setReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) {
 | 
						|
		annotationChanged = true
 | 
						|
	}
 | 
						|
	return annotationChanged
 | 
						|
}
 | 
						|
 | 
						|
var annotationsToSkip = map[string]bool{
 | 
						|
	annotations.LastAppliedConfigAnnotation:  true,
 | 
						|
	deploymentutil.RevisionAnnotation:        true,
 | 
						|
	deploymentutil.DesiredReplicasAnnotation: true,
 | 
						|
	deploymentutil.MaxReplicasAnnotation:     true,
 | 
						|
}
 | 
						|
 | 
						|
// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key
 | 
						|
// TODO: How to decide which annotations should / should not be copied?
 | 
						|
//       See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615
 | 
						|
func skipCopyAnnotation(key string) bool {
 | 
						|
	return annotationsToSkip[key]
 | 
						|
}
 | 
						|
 | 
						|
func getSkippedAnnotations(annotations map[string]string) map[string]string {
 | 
						|
	skippedAnnotations := make(map[string]string)
 | 
						|
	for k, v := range annotations {
 | 
						|
		if skipCopyAnnotation(k) {
 | 
						|
			skippedAnnotations[k] = v
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return skippedAnnotations
 | 
						|
}
 | 
						|
 | 
						|
// copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations,
 | 
						|
// and returns true if replica set's annotation is changed.
 | 
						|
// Note that apply and revision annotations are not copied.
 | 
						|
func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool {
 | 
						|
	rsAnnotationsChanged := false
 | 
						|
	if rs.Annotations == nil {
 | 
						|
		rs.Annotations = make(map[string]string)
 | 
						|
	}
 | 
						|
	for k, v := range deployment.Annotations {
 | 
						|
		// newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated
 | 
						|
		// by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of
 | 
						|
		// deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable.
 | 
						|
		if skipCopyAnnotation(k) || rs.Annotations[k] == v {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		rs.Annotations[k] = v
 | 
						|
		rsAnnotationsChanged = true
 | 
						|
	}
 | 
						|
	return rsAnnotationsChanged
 | 
						|
}
 | 
						|
 | 
						|
// setDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations.
 | 
						|
// This action should be done if and only if the deployment is rolling back to this rs.
 | 
						|
// Note that apply and revision annotations are not changed.
 | 
						|
func setDeploymentAnnotationsTo(deployment *extensions.Deployment, rollbackToRS *extensions.ReplicaSet) {
 | 
						|
	deployment.Annotations = getSkippedAnnotations(deployment.Annotations)
 | 
						|
	for k, v := range rollbackToRS.Annotations {
 | 
						|
		if !skipCopyAnnotation(k) {
 | 
						|
			deployment.Annotations[k] = v
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) updateDeploymentRevision(deployment *extensions.Deployment, revision string) error {
 | 
						|
	if deployment.Annotations == nil {
 | 
						|
		deployment.Annotations = make(map[string]string)
 | 
						|
	}
 | 
						|
	if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
 | 
						|
		deployment.Annotations[deploymentutil.RevisionAnnotation] = revision
 | 
						|
		_, err := dc.updateDeployment(deployment)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
 | 
						|
	if newRS.Spec.Replicas == deployment.Spec.Replicas {
 | 
						|
		// Scaling not required.
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
	if newRS.Spec.Replicas > deployment.Spec.Replicas {
 | 
						|
		// Scale down.
 | 
						|
		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
 | 
						|
		return scaled, err
 | 
						|
	}
 | 
						|
	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
 | 
						|
	if err != nil {
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
 | 
						|
	return scaled, err
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) {
 | 
						|
	podList, err := dc.listPods(deployment)
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds)
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
 | 
						|
	oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
 | 
						|
	if oldPodsCount == 0 {
 | 
						|
		// Can't scale down further
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
 | 
						|
	minReadySeconds := deployment.Spec.MinReadySeconds
 | 
						|
	allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
 | 
						|
	// TODO: use dc.getAvailablePodsForReplicaSets instead
 | 
						|
	newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{newRS}, minReadySeconds)
 | 
						|
	if err != nil {
 | 
						|
		return false, fmt.Errorf("could not find available pods: %v", err)
 | 
						|
	}
 | 
						|
	maxUnavailable := maxUnavailable(*deployment)
 | 
						|
 | 
						|
	// Check if we can scale down. We can scale down in the following 2 cases:
 | 
						|
	// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
 | 
						|
	//  increase unavailability.
 | 
						|
	// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
 | 
						|
	//
 | 
						|
	// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
 | 
						|
	// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
 | 
						|
	// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
 | 
						|
	// step(that will increase unavailability).
 | 
						|
	//
 | 
						|
	// Concrete example:
 | 
						|
	//
 | 
						|
	// * 10 replicas
 | 
						|
	// * 2 maxUnavailable (absolute number, not percent)
 | 
						|
	// * 3 maxSurge (absolute number, not percent)
 | 
						|
	//
 | 
						|
	// case 1:
 | 
						|
	// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
 | 
						|
	// * The new replica set pods crashloop and never become available.
 | 
						|
	// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
 | 
						|
	// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
 | 
						|
	// * The user notices the crashloop and does kubectl rollout undo to rollback.
 | 
						|
	// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
 | 
						|
	// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
 | 
						|
	//
 | 
						|
	// case 2:
 | 
						|
	// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
 | 
						|
	// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
 | 
						|
	// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
 | 
						|
	// allow the new replica set to be scaled up by 5.
 | 
						|
	minAvailable := deployment.Spec.Replicas - maxUnavailable
 | 
						|
	newRSUnavailablePodCount := newRS.Spec.Replicas - newRSAvailablePodCount
 | 
						|
	maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
 | 
						|
	if maxScaledDown <= 0 {
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
 | 
						|
	// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
 | 
						|
	oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, deployment.Spec.MinReadySeconds, maxScaledDown)
 | 
						|
	if err != nil {
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)
 | 
						|
 | 
						|
	// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
 | 
						|
	allRSs = append(oldRSs, newRS)
 | 
						|
	scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
 | 
						|
	if err != nil {
 | 
						|
		return false, nil
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount)
 | 
						|
 | 
						|
	totalScaledDown := cleanupCount + scaledDownCount
 | 
						|
	return totalScaledDown > 0, nil
 | 
						|
}
 | 
						|
 | 
						|
// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
 | 
						|
func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, minReadySeconds, maxCleanupCount int32) ([]*extensions.ReplicaSet, int32, error) {
 | 
						|
	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
 | 
						|
	// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
 | 
						|
	// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
 | 
						|
	// been deleted first and won't increase unavailability.
 | 
						|
	totalScaledDown := int32(0)
 | 
						|
	for i, targetRS := range oldRSs {
 | 
						|
		if totalScaledDown >= maxCleanupCount {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if targetRS.Spec.Replicas == 0 {
 | 
						|
			// cannot scale down this replica set.
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// TODO: use dc.getAvailablePodsForReplicaSets instead
 | 
						|
		availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{targetRS}, minReadySeconds)
 | 
						|
		if err != nil {
 | 
						|
			return nil, totalScaledDown, fmt.Errorf("could not find available pods: %v", err)
 | 
						|
		}
 | 
						|
		if targetRS.Spec.Replicas == availablePodCount {
 | 
						|
			// no unhealthy replicas found, no scaling required.
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		scaledDownCount := int32(integer.IntMin(int(maxCleanupCount-totalScaledDown), int(targetRS.Spec.Replicas-availablePodCount)))
 | 
						|
		newReplicasCount := targetRS.Spec.Replicas - scaledDownCount
 | 
						|
		if newReplicasCount > targetRS.Spec.Replicas {
 | 
						|
			return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount)
 | 
						|
		}
 | 
						|
		_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
 | 
						|
		if err != nil {
 | 
						|
			return nil, totalScaledDown, err
 | 
						|
		}
 | 
						|
		totalScaledDown += scaledDownCount
 | 
						|
		oldRSs[i] = updatedOldRS
 | 
						|
	}
 | 
						|
	return oldRSs, totalScaledDown, nil
 | 
						|
}
 | 
						|
 | 
						|
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
 | 
						|
// Need check maxUnavailable to ensure availability
 | 
						|
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int32, error) {
 | 
						|
	maxUnavailable := maxUnavailable(*deployment)
 | 
						|
 | 
						|
	// Check if we can scale down.
 | 
						|
	minAvailable := deployment.Spec.Replicas - maxUnavailable
 | 
						|
	minReadySeconds := deployment.Spec.MinReadySeconds
 | 
						|
	// Find the number of ready pods.
 | 
						|
	// TODO: use dc.getAvailablePodsForReplicaSets instead
 | 
						|
	availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, allRSs, minReadySeconds)
 | 
						|
	if err != nil {
 | 
						|
		return 0, fmt.Errorf("could not find available pods: %v", err)
 | 
						|
	}
 | 
						|
	if availablePodCount <= minAvailable {
 | 
						|
		// Cannot scale down.
 | 
						|
		return 0, nil
 | 
						|
	}
 | 
						|
	glog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name)
 | 
						|
 | 
						|
	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
 | 
						|
 | 
						|
	totalScaledDown := int32(0)
 | 
						|
	totalScaleDownCount := availablePodCount - minAvailable
 | 
						|
	for _, targetRS := range oldRSs {
 | 
						|
		if totalScaledDown >= totalScaleDownCount {
 | 
						|
			// No further scaling required.
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if targetRS.Spec.Replicas == 0 {
 | 
						|
			// cannot scale down this ReplicaSet.
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// Scale down.
 | 
						|
		scaleDownCount := int32(integer.IntMin(int(targetRS.Spec.Replicas), int(totalScaleDownCount-totalScaledDown)))
 | 
						|
		newReplicasCount := targetRS.Spec.Replicas - scaleDownCount
 | 
						|
		if newReplicasCount > targetRS.Spec.Replicas {
 | 
						|
			return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount)
 | 
						|
		}
 | 
						|
		_, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
 | 
						|
		if err != nil {
 | 
						|
			return totalScaledDown, err
 | 
						|
		}
 | 
						|
 | 
						|
		totalScaledDown += scaleDownCount
 | 
						|
	}
 | 
						|
 | 
						|
	return totalScaledDown, nil
 | 
						|
}
 | 
						|
 | 
						|
// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate"
 | 
						|
func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
 | 
						|
	scaled := false
 | 
						|
	for _, rs := range oldRSs {
 | 
						|
		// Scaling not required.
 | 
						|
		if rs.Spec.Replicas == 0 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		scaledRS, _, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment)
 | 
						|
		if err != nil {
 | 
						|
			return false, err
 | 
						|
		}
 | 
						|
		if scaledRS {
 | 
						|
			scaled = true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return scaled, nil
 | 
						|
}
 | 
						|
 | 
						|
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate"
 | 
						|
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
 | 
						|
	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment)
 | 
						|
	return scaled, err
 | 
						|
}
 | 
						|
 | 
						|
// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets
 | 
						|
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
 | 
						|
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
 | 
						|
func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error {
 | 
						|
	if deployment.Spec.RevisionHistoryLimit == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit
 | 
						|
	if diff <= 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
 | 
						|
 | 
						|
	var errList []error
 | 
						|
	// TODO: This should be parallelized.
 | 
						|
	for i := int32(0); i < diff; i++ {
 | 
						|
		rs := oldRSs[i]
 | 
						|
		// Avoid delete replica set with non-zero replica counts
 | 
						|
		if rs.Status.Replicas != 0 || rs.Spec.Replicas != 0 || rs.Generation > rs.Status.ObservedGeneration {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if err := dc.client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, nil); err != nil && !errors.IsNotFound(err) {
 | 
						|
			glog.V(2).Infof("Failed deleting old replica set %v for deployment %v: %v", rs.Name, deployment.Name, err)
 | 
						|
			errList = append(errList, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return utilerrors.NewAggregate(errList)
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error {
 | 
						|
	newStatus, err := dc.calculateStatus(allRSs, newRS, deployment)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	newDeployment := deployment
 | 
						|
	newDeployment.Status = newStatus
 | 
						|
	_, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) {
 | 
						|
	availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs)
 | 
						|
	if err != nil {
 | 
						|
		return deployment.Status, fmt.Errorf("failed to count available pods: %v", err)
 | 
						|
	}
 | 
						|
	totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
 | 
						|
 | 
						|
	return extensions.DeploymentStatus{
 | 
						|
		// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
 | 
						|
		ObservedGeneration:  deployment.Generation,
 | 
						|
		Replicas:            deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
 | 
						|
		UpdatedReplicas:     deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}),
 | 
						|
		AvailableReplicas:   availableReplicas,
 | 
						|
		UnavailableReplicas: totalReplicas - availableReplicas,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) {
 | 
						|
	// No need to scale
 | 
						|
	if rs.Spec.Replicas == newScale {
 | 
						|
		return false, rs, nil
 | 
						|
	}
 | 
						|
	var scalingOperation string
 | 
						|
	if rs.Spec.Replicas < newScale {
 | 
						|
		scalingOperation = "up"
 | 
						|
	} else {
 | 
						|
		scalingOperation = "down"
 | 
						|
	}
 | 
						|
	newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
 | 
						|
	return true, newRS, err
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) {
 | 
						|
	// NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea.
 | 
						|
	rs.Spec.Replicas = newScale
 | 
						|
	setReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment))
 | 
						|
	rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
 | 
						|
	if err == nil {
 | 
						|
		dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
 | 
						|
	} else {
 | 
						|
		glog.Warningf("Cannot update replica set %q: %v", rs.Name, err)
 | 
						|
		dc.enqueueDeployment(deployment)
 | 
						|
	}
 | 
						|
	return rs, err
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
 | 
						|
	return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
 | 
						|
}
 | 
						|
 | 
						|
func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deployment, rs *extensions.ReplicaSet) (d *extensions.Deployment, performedRollback bool, err error) {
 | 
						|
	if !reflect.DeepEqual(deploymentutil.GetNewReplicaSetTemplate(deployment), rs.Spec.Template) {
 | 
						|
		glog.Infof("Rolling back deployment %s to template spec %+v", deployment.Name, rs.Spec.Template.Spec)
 | 
						|
		deploymentutil.SetFromReplicaSetTemplate(deployment, rs.Spec.Template)
 | 
						|
		// set RS (the old RS we'll rolling back to) annotations back to the deployment;
 | 
						|
		// otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback.
 | 
						|
		//
 | 
						|
		// For example,
 | 
						|
		// A Deployment has old RS1 with annotation {change-cause:create}, and new RS2 {change-cause:edit}.
 | 
						|
		// Note that both annotations are copied from Deployment, and the Deployment should be annotated {change-cause:edit} as well.
 | 
						|
		// Now, rollback Deployment to RS1, we should update Deployment's pod-template and also copy annotation from RS1.
 | 
						|
		// Deployment is now annotated {change-cause:create}, and we have new RS1 {change-cause:create}, old RS2 {change-cause:edit}.
 | 
						|
		//
 | 
						|
		// If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit},
 | 
						|
		// and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct.
 | 
						|
		setDeploymentAnnotationsTo(deployment, rs)
 | 
						|
		performedRollback = true
 | 
						|
	} else {
 | 
						|
		glog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %s, skipping rollback...", deployment.Name)
 | 
						|
		dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackTemplateUnchanged, fmt.Sprintf("The rollback revision contains the same template as current deployment %q", deployment.Name))
 | 
						|
	}
 | 
						|
	d, err = dc.updateDeploymentAndClearRollbackTo(deployment)
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// isScalingEvent checks whether the provided deployment has been updated with a scaling event
 | 
						|
// by looking at the desired-replicas annotation in the active replica sets of the deployment.
 | 
						|
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) bool {
 | 
						|
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
 | 
						|
	if err != nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	// If there is no new replica set matching this deployment and the deployment isn't paused
 | 
						|
	// then there is a new rollout that waits to happen
 | 
						|
	if newRS == nil && !d.Spec.Paused {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	allRSs := append(oldRSs, newRS)
 | 
						|
	for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
 | 
						|
		desired, ok := getDesiredReplicasAnnotation(rs)
 | 
						|
		if !ok {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if desired != d.Spec.Replicas {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 |