mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			558 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			558 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package statefulset
 | 
						|
 | 
						|
import (
 | 
						|
	"math"
 | 
						|
	"sort"
 | 
						|
 | 
						|
	"k8s.io/klog"
 | 
						|
 | 
						|
	apps "k8s.io/api/apps/v1"
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/client-go/tools/record"
 | 
						|
	"k8s.io/kubernetes/pkg/controller/history"
 | 
						|
)
 | 
						|
 | 
						|
// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
 | 
						|
// as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
 | 
						|
type StatefulSetControlInterface interface {
 | 
						|
	// UpdateStatefulSet implements the control logic for Pod creation, update, and deletion, and
 | 
						|
	// persistent volume creation, update, and deletion.
 | 
						|
	// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
 | 
						|
	// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
 | 
						|
	// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
 | 
						|
	UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error
 | 
						|
	// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
 | 
						|
	// error is nil, the returns slice of ControllerRevisions is valid.
 | 
						|
	ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
 | 
						|
	// AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are
 | 
						|
	// successful the returned error is nil.
 | 
						|
	AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
 | 
						|
}
 | 
						|
 | 
						|
// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
 | 
						|
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
 | 
						|
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
 | 
						|
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
 | 
						|
// scenario other than testing.
 | 
						|
func NewDefaultStatefulSetControl(
 | 
						|
	podControl StatefulPodControlInterface,
 | 
						|
	statusUpdater StatefulSetStatusUpdaterInterface,
 | 
						|
	controllerHistory history.Interface,
 | 
						|
	recorder record.EventRecorder) StatefulSetControlInterface {
 | 
						|
	return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
 | 
						|
}
 | 
						|
 | 
						|
type defaultStatefulSetControl struct {
 | 
						|
	podControl        StatefulPodControlInterface
 | 
						|
	statusUpdater     StatefulSetStatusUpdaterInterface
 | 
						|
	controllerHistory history.Interface
 | 
						|
	recorder          record.EventRecorder
 | 
						|
}
 | 
						|
 | 
						|
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
 | 
						|
// consistent monotonic update strategy by default - scale up proceeds in ordinal order, no new pod
 | 
						|
// is created while any pod is unhealthy, and pods are terminated in descending order. The burst
 | 
						|
// strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
 | 
						|
// in no particular order. Clients using the burst strategy should be careful to ensure they
 | 
						|
// understand the consistency implications of having unpredictable numbers of pods available.
 | 
						|
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
 | 
						|
 | 
						|
	// list all revisions and sort them
 | 
						|
	revisions, err := ssc.ListRevisions(set)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	history.SortControllerRevisions(revisions)
 | 
						|
 | 
						|
	// get the current, and update revisions
 | 
						|
	currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// perform the main update function and get the status
 | 
						|
	status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// update the set's status
 | 
						|
	err = ssc.updateStatefulSetStatus(set, status)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
 | 
						|
		set.Namespace,
 | 
						|
		set.Name,
 | 
						|
		status.Replicas,
 | 
						|
		status.ReadyReplicas,
 | 
						|
		status.CurrentReplicas,
 | 
						|
		status.UpdatedReplicas)
 | 
						|
 | 
						|
	klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
 | 
						|
		set.Namespace,
 | 
						|
		set.Name,
 | 
						|
		status.CurrentRevision,
 | 
						|
		status.UpdateRevision)
 | 
						|
 | 
						|
	// maintain the set's revision history limit
 | 
						|
	return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
 | 
						|
}
 | 
						|
 | 
						|
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
 | 
						|
	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return ssc.controllerHistory.ListControllerRevisions(set, selector)
 | 
						|
}
 | 
						|
 | 
						|
func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions(
 | 
						|
	set *apps.StatefulSet,
 | 
						|
	revisions []*apps.ControllerRevision) error {
 | 
						|
	for i := range revisions {
 | 
						|
		adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i])
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		revisions[i] = adopted
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// truncateHistory truncates any non-live ControllerRevisions in revisions from set's history. The UpdateRevision and
 | 
						|
// CurrentRevision in set's Status are considered to be live. Any revisions associated with the Pods in pods are also
 | 
						|
// considered to be live. Non-live revisions are deleted, starting with the revision with the lowest Revision, until
 | 
						|
// only RevisionHistoryLimit revisions remain. If the returned error is nil the operation was successful. This method
 | 
						|
// expects that revisions is sorted when supplied.
 | 
						|
func (ssc *defaultStatefulSetControl) truncateHistory(
 | 
						|
	set *apps.StatefulSet,
 | 
						|
	pods []*v1.Pod,
 | 
						|
	revisions []*apps.ControllerRevision,
 | 
						|
	current *apps.ControllerRevision,
 | 
						|
	update *apps.ControllerRevision) error {
 | 
						|
	history := make([]*apps.ControllerRevision, 0, len(revisions))
 | 
						|
	// mark all live revisions
 | 
						|
	live := map[string]bool{current.Name: true, update.Name: true}
 | 
						|
	for i := range pods {
 | 
						|
		live[getPodRevision(pods[i])] = true
 | 
						|
	}
 | 
						|
	// collect live revisions and historic revisions
 | 
						|
	for i := range revisions {
 | 
						|
		if !live[revisions[i].Name] {
 | 
						|
			history = append(history, revisions[i])
 | 
						|
		}
 | 
						|
	}
 | 
						|
	historyLen := len(history)
 | 
						|
	historyLimit := int(*set.Spec.RevisionHistoryLimit)
 | 
						|
	if historyLen <= historyLimit {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	// delete any non-live history to maintain the revision limit.
 | 
						|
	history = history[:(historyLen - historyLimit)]
 | 
						|
	for i := 0; i < len(history); i++ {
 | 
						|
		if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// getStatefulSetRevisions returns the current and update ControllerRevisions for set. It also
 | 
						|
// returns a collision count that records the number of name collisions set saw when creating
 | 
						|
// new ControllerRevisions. This count is incremented on every name collision and is used in
 | 
						|
// building the ControllerRevision names for name collision avoidance. This method may create
 | 
						|
// a new revision, or modify the Revision of an existing revision if an update to set is detected.
 | 
						|
// This method expects that revisions is sorted when supplied.
 | 
						|
func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
 | 
						|
	set *apps.StatefulSet,
 | 
						|
	revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
 | 
						|
	var currentRevision, updateRevision *apps.ControllerRevision
 | 
						|
 | 
						|
	revisionCount := len(revisions)
 | 
						|
	history.SortControllerRevisions(revisions)
 | 
						|
 | 
						|
	// Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly.
 | 
						|
	// This copy is returned so the value gets carried over to set.Status in updateStatefulSet.
 | 
						|
	var collisionCount int32
 | 
						|
	if set.Status.CollisionCount != nil {
 | 
						|
		collisionCount = *set.Status.CollisionCount
 | 
						|
	}
 | 
						|
 | 
						|
	// create a new revision from the current set
 | 
						|
	updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, collisionCount, err
 | 
						|
	}
 | 
						|
 | 
						|
	// find any equivalent revisions
 | 
						|
	equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
 | 
						|
	equalCount := len(equalRevisions)
 | 
						|
 | 
						|
	if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
 | 
						|
		// if the equivalent revision is immediately prior the update revision has not changed
 | 
						|
		updateRevision = revisions[revisionCount-1]
 | 
						|
	} else if equalCount > 0 {
 | 
						|
		// if the equivalent revision is not immediately prior we will roll back by incrementing the
 | 
						|
		// Revision of the equivalent revision
 | 
						|
		updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
 | 
						|
			equalRevisions[equalCount-1],
 | 
						|
			updateRevision.Revision)
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, collisionCount, err
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		//if there is no equivalent revision we create a new one
 | 
						|
		updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount)
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, collisionCount, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// attempt to find the revision that corresponds to the current revision
 | 
						|
	for i := range revisions {
 | 
						|
		if revisions[i].Name == set.Status.CurrentRevision {
 | 
						|
			currentRevision = revisions[i]
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// if the current revision is nil we initialize the history by setting it to the update revision
 | 
						|
	if currentRevision == nil {
 | 
						|
		currentRevision = updateRevision
 | 
						|
	}
 | 
						|
 | 
						|
	return currentRevision, updateRevision, collisionCount, nil
 | 
						|
}
 | 
						|
 | 
						|
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
 | 
						|
// the set in order to conform the system to the target state for the set. The target state always contains
 | 
						|
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
 | 
						|
// RollingUpdateStatefulSetStrategyType then all Pods in the set must be at set.Status.CurrentRevision.
 | 
						|
// If the UpdateStrategy.Type for the set is OnDeleteStatefulSetStrategyType, the target state implies nothing about
 | 
						|
// the revisions of Pods in the set. If the UpdateStrategy.Type for the set is PartitionStatefulSetStrategyType, then
 | 
						|
// all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
 | 
						|
// Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
 | 
						|
// update must be recorded. If the error is not nil, the method should be retried until successful.
 | 
						|
func (ssc *defaultStatefulSetControl) updateStatefulSet(
 | 
						|
	set *apps.StatefulSet,
 | 
						|
	currentRevision *apps.ControllerRevision,
 | 
						|
	updateRevision *apps.ControllerRevision,
 | 
						|
	collisionCount int32,
 | 
						|
	pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
 | 
						|
	// get the current and update revisions of the set.
 | 
						|
	currentSet, err := ApplyRevision(set, currentRevision)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	updateSet, err := ApplyRevision(set, updateRevision)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// set the generation, and revisions in the returned status
 | 
						|
	status := apps.StatefulSetStatus{}
 | 
						|
	status.ObservedGeneration = set.Generation
 | 
						|
	status.CurrentRevision = currentRevision.Name
 | 
						|
	status.UpdateRevision = updateRevision.Name
 | 
						|
	status.CollisionCount = new(int32)
 | 
						|
	*status.CollisionCount = collisionCount
 | 
						|
 | 
						|
	replicaCount := int(*set.Spec.Replicas)
 | 
						|
	// slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
 | 
						|
	replicas := make([]*v1.Pod, replicaCount)
 | 
						|
	// slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod)
 | 
						|
	condemned := make([]*v1.Pod, 0, len(pods))
 | 
						|
	unhealthy := 0
 | 
						|
	firstUnhealthyOrdinal := math.MaxInt32
 | 
						|
	var firstUnhealthyPod *v1.Pod
 | 
						|
 | 
						|
	// First we partition pods into two lists valid replicas and condemned Pods
 | 
						|
	for i := range pods {
 | 
						|
		status.Replicas++
 | 
						|
 | 
						|
		// count the number of running and ready replicas
 | 
						|
		if isRunningAndReady(pods[i]) {
 | 
						|
			status.ReadyReplicas++
 | 
						|
		}
 | 
						|
 | 
						|
		// count the number of current and update replicas
 | 
						|
		if isCreated(pods[i]) && !isTerminating(pods[i]) {
 | 
						|
			if getPodRevision(pods[i]) == currentRevision.Name {
 | 
						|
				status.CurrentReplicas++
 | 
						|
			}
 | 
						|
			if getPodRevision(pods[i]) == updateRevision.Name {
 | 
						|
				status.UpdatedReplicas++
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
 | 
						|
			// if the ordinal of the pod is within the range of the current number of replicas,
 | 
						|
			// insert it at the indirection of its ordinal
 | 
						|
			replicas[ord] = pods[i]
 | 
						|
 | 
						|
		} else if ord >= replicaCount {
 | 
						|
			// if the ordinal is greater than the number of replicas add it to the condemned list
 | 
						|
			condemned = append(condemned, pods[i])
 | 
						|
		}
 | 
						|
		// If the ordinal could not be parsed (ord < 0), ignore the Pod.
 | 
						|
	}
 | 
						|
 | 
						|
	// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
 | 
						|
	for ord := 0; ord < replicaCount; ord++ {
 | 
						|
		if replicas[ord] == nil {
 | 
						|
			replicas[ord] = newVersionedStatefulSetPod(
 | 
						|
				currentSet,
 | 
						|
				updateSet,
 | 
						|
				currentRevision.Name,
 | 
						|
				updateRevision.Name, ord)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// sort the condemned Pods by their ordinals
 | 
						|
	sort.Sort(ascendingOrdinal(condemned))
 | 
						|
 | 
						|
	// find the first unhealthy Pod
 | 
						|
	for i := range replicas {
 | 
						|
		if !isHealthy(replicas[i]) {
 | 
						|
			unhealthy++
 | 
						|
			if ord := getOrdinal(replicas[i]); ord < firstUnhealthyOrdinal {
 | 
						|
				firstUnhealthyOrdinal = ord
 | 
						|
				firstUnhealthyPod = replicas[i]
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	for i := range condemned {
 | 
						|
		if !isHealthy(condemned[i]) {
 | 
						|
			unhealthy++
 | 
						|
			if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal {
 | 
						|
				firstUnhealthyOrdinal = ord
 | 
						|
				firstUnhealthyPod = condemned[i]
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if unhealthy > 0 {
 | 
						|
		klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
 | 
						|
			set.Namespace,
 | 
						|
			set.Name,
 | 
						|
			unhealthy,
 | 
						|
			firstUnhealthyPod.Name)
 | 
						|
	}
 | 
						|
 | 
						|
	// If the StatefulSet is being deleted, don't do anything other than updating
 | 
						|
	// status.
 | 
						|
	if set.DeletionTimestamp != nil {
 | 
						|
		return &status, nil
 | 
						|
	}
 | 
						|
 | 
						|
	monotonic := !allowsBurst(set)
 | 
						|
 | 
						|
	// Examine each replica with respect to its ordinal
 | 
						|
	for i := range replicas {
 | 
						|
		// delete and recreate failed pods
 | 
						|
		if isFailed(replicas[i]) {
 | 
						|
			ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
 | 
						|
				"StatefulSet %s/%s is recreating failed Pod %s",
 | 
						|
				set.Namespace,
 | 
						|
				set.Name,
 | 
						|
				replicas[i].Name)
 | 
						|
			if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
 | 
						|
				return &status, err
 | 
						|
			}
 | 
						|
			if getPodRevision(replicas[i]) == currentRevision.Name {
 | 
						|
				status.CurrentReplicas--
 | 
						|
			}
 | 
						|
			if getPodRevision(replicas[i]) == updateRevision.Name {
 | 
						|
				status.UpdatedReplicas--
 | 
						|
			}
 | 
						|
			status.Replicas--
 | 
						|
			replicas[i] = newVersionedStatefulSetPod(
 | 
						|
				currentSet,
 | 
						|
				updateSet,
 | 
						|
				currentRevision.Name,
 | 
						|
				updateRevision.Name,
 | 
						|
				i)
 | 
						|
		}
 | 
						|
		// If we find a Pod that has not been created we create the Pod
 | 
						|
		if !isCreated(replicas[i]) {
 | 
						|
			if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
 | 
						|
				return &status, err
 | 
						|
			}
 | 
						|
			status.Replicas++
 | 
						|
			if getPodRevision(replicas[i]) == currentRevision.Name {
 | 
						|
				status.CurrentReplicas++
 | 
						|
			}
 | 
						|
			if getPodRevision(replicas[i]) == updateRevision.Name {
 | 
						|
				status.UpdatedReplicas++
 | 
						|
			}
 | 
						|
 | 
						|
			// if the set does not allow bursting, return immediately
 | 
						|
			if monotonic {
 | 
						|
				return &status, nil
 | 
						|
			}
 | 
						|
			// pod created, no more work possible for this round
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// If we find a Pod that is currently terminating, we must wait until graceful deletion
 | 
						|
		// completes before we continue to make progress.
 | 
						|
		if isTerminating(replicas[i]) && monotonic {
 | 
						|
			klog.V(4).Infof(
 | 
						|
				"StatefulSet %s/%s is waiting for Pod %s to Terminate",
 | 
						|
				set.Namespace,
 | 
						|
				set.Name,
 | 
						|
				replicas[i].Name)
 | 
						|
			return &status, nil
 | 
						|
		}
 | 
						|
		// If we have a Pod that has been created but is not running and ready we can not make progress.
 | 
						|
		// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
 | 
						|
		// ordinal, are Running and Ready.
 | 
						|
		if !isRunningAndReady(replicas[i]) && monotonic {
 | 
						|
			klog.V(4).Infof(
 | 
						|
				"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
 | 
						|
				set.Namespace,
 | 
						|
				set.Name,
 | 
						|
				replicas[i].Name)
 | 
						|
			return &status, nil
 | 
						|
		}
 | 
						|
		// Enforce the StatefulSet invariants
 | 
						|
		if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// Make a deep copy so we don't mutate the shared cache
 | 
						|
		replica := replicas[i].DeepCopy()
 | 
						|
		if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
 | 
						|
			return &status, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// At this point, all of the current Replicas are Running and Ready, we can consider termination.
 | 
						|
	// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
 | 
						|
	// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
 | 
						|
	// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
 | 
						|
	// updates.
 | 
						|
	for target := len(condemned) - 1; target >= 0; target-- {
 | 
						|
		// wait for terminating pods to expire
 | 
						|
		if isTerminating(condemned[target]) {
 | 
						|
			klog.V(4).Infof(
 | 
						|
				"StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
 | 
						|
				set.Namespace,
 | 
						|
				set.Name,
 | 
						|
				condemned[target].Name)
 | 
						|
			// block if we are in monotonic mode
 | 
						|
			if monotonic {
 | 
						|
				return &status, nil
 | 
						|
			}
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
 | 
						|
		if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
 | 
						|
			klog.V(4).Infof(
 | 
						|
				"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
 | 
						|
				set.Namespace,
 | 
						|
				set.Name,
 | 
						|
				firstUnhealthyPod.Name)
 | 
						|
			return &status, nil
 | 
						|
		}
 | 
						|
		klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
 | 
						|
			set.Namespace,
 | 
						|
			set.Name,
 | 
						|
			condemned[target].Name)
 | 
						|
 | 
						|
		if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
 | 
						|
			return &status, err
 | 
						|
		}
 | 
						|
		if getPodRevision(condemned[target]) == currentRevision.Name {
 | 
						|
			status.CurrentReplicas--
 | 
						|
		}
 | 
						|
		if getPodRevision(condemned[target]) == updateRevision.Name {
 | 
						|
			status.UpdatedReplicas--
 | 
						|
		}
 | 
						|
		if monotonic {
 | 
						|
			return &status, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
 | 
						|
	if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
 | 
						|
		return &status, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
 | 
						|
	updateMin := 0
 | 
						|
	if set.Spec.UpdateStrategy.RollingUpdate != nil {
 | 
						|
		updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
 | 
						|
	}
 | 
						|
	// we terminate the Pod with the largest ordinal that does not match the update revision.
 | 
						|
	for target := len(replicas) - 1; target >= updateMin; target-- {
 | 
						|
 | 
						|
		// delete the Pod if it is not already terminating and does not match the update revision.
 | 
						|
		if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
 | 
						|
			klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
 | 
						|
				set.Namespace,
 | 
						|
				set.Name,
 | 
						|
				replicas[target].Name)
 | 
						|
			err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
 | 
						|
			status.CurrentReplicas--
 | 
						|
			return &status, err
 | 
						|
		}
 | 
						|
 | 
						|
		// wait for unhealthy Pods on update
 | 
						|
		if !isHealthy(replicas[target]) {
 | 
						|
			klog.V(4).Infof(
 | 
						|
				"StatefulSet %s/%s is waiting for Pod %s to update",
 | 
						|
				set.Namespace,
 | 
						|
				set.Name,
 | 
						|
				replicas[target].Name)
 | 
						|
			return &status, nil
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
	return &status, nil
 | 
						|
}
 | 
						|
 | 
						|
// updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is
 | 
						|
// mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the
 | 
						|
// returned error is nil, the update is successful.
 | 
						|
func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
 | 
						|
	set *apps.StatefulSet,
 | 
						|
	status *apps.StatefulSetStatus) error {
 | 
						|
 | 
						|
	// complete any in progress rolling update if necessary
 | 
						|
	completeRollingUpdate(set, status)
 | 
						|
 | 
						|
	// if the status is not inconsistent do not perform an update
 | 
						|
	if !inconsistentStatus(set, status) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	// copy set and update its status
 | 
						|
	set = set.DeepCopy()
 | 
						|
	if err := ssc.statusUpdater.UpdateStatefulSetStatus(set, status); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
var _ StatefulSetControlInterface = &defaultStatefulSetControl{}
 |