mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 11:48:15 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			400 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			400 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package statefulset
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"regexp"
 | 
						|
	"strconv"
 | 
						|
 | 
						|
	apps "k8s.io/api/apps/v1beta1"
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/util/strategicpatch"
 | 
						|
	"k8s.io/client-go/kubernetes/scheme"
 | 
						|
	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
 | 
						|
	"k8s.io/kubernetes/pkg/controller"
 | 
						|
	"k8s.io/kubernetes/pkg/controller/history"
 | 
						|
)
 | 
						|
 | 
						|
// maxUpdateRetries is the maximum number of retries used for update conflict resolution prior to failure
 | 
						|
const maxUpdateRetries = 10
 | 
						|
 | 
						|
// updateConflictError is the error used to indicate that the maximum number of retries against the API server have
 | 
						|
// been attempted and we need to back off
 | 
						|
var updateConflictError = fmt.Errorf("aborting update after %d attempts", maxUpdateRetries)
 | 
						|
var patchCodec = scheme.Codecs.LegacyCodec(apps.SchemeGroupVersion)
 | 
						|
 | 
						|
// overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker.
 | 
						|
// Generally used to tie break between StatefulSets that have overlapping selectors.
 | 
						|
type overlappingStatefulSets []*apps.StatefulSet
 | 
						|
 | 
						|
func (o overlappingStatefulSets) Len() int { return len(o) }
 | 
						|
 | 
						|
func (o overlappingStatefulSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
 | 
						|
 | 
						|
func (o overlappingStatefulSets) Less(i, j int) bool {
 | 
						|
	if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
 | 
						|
		return o[i].Name < o[j].Name
 | 
						|
	}
 | 
						|
	return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
 | 
						|
}
 | 
						|
 | 
						|
// statefulPodRegex is a regular expression that extracts the parent StatefulSet and ordinal from the Name of a Pod
 | 
						|
var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
 | 
						|
 | 
						|
// getParentNameAndOrdinal gets the name of pod's parent StatefulSet and pod's ordinal as extracted from its Name. If
 | 
						|
// the Pod was not created by a StatefulSet, its parent is considered to be empty string, and its ordinal is considered
 | 
						|
// to be -1.
 | 
						|
func getParentNameAndOrdinal(pod *v1.Pod) (string, int) {
 | 
						|
	parent := ""
 | 
						|
	ordinal := -1
 | 
						|
	subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
 | 
						|
	if len(subMatches) < 3 {
 | 
						|
		return parent, ordinal
 | 
						|
	}
 | 
						|
	parent = subMatches[1]
 | 
						|
	if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil {
 | 
						|
		ordinal = int(i)
 | 
						|
	}
 | 
						|
	return parent, ordinal
 | 
						|
}
 | 
						|
 | 
						|
// getParentName gets the name of pod's parent StatefulSet. If pod has not parent, the empty string is returned.
 | 
						|
func getParentName(pod *v1.Pod) string {
 | 
						|
	parent, _ := getParentNameAndOrdinal(pod)
 | 
						|
	return parent
 | 
						|
}
 | 
						|
 | 
						|
//  getOrdinal gets pod's ordinal. If pod has no ordinal, -1 is returned.
 | 
						|
func getOrdinal(pod *v1.Pod) int {
 | 
						|
	_, ordinal := getParentNameAndOrdinal(pod)
 | 
						|
	return ordinal
 | 
						|
}
 | 
						|
 | 
						|
// getPodName gets the name of set's child Pod with an ordinal index of ordinal
 | 
						|
func getPodName(set *apps.StatefulSet, ordinal int) string {
 | 
						|
	return fmt.Sprintf("%s-%d", set.Name, ordinal)
 | 
						|
}
 | 
						|
 | 
						|
// getPersistentVolumeClaimName gets the name of PersistentVolumeClaim for a Pod with an ordinal index of ordinal. claim
 | 
						|
// must be a PersistentVolumeClaim from set's VolumeClaims template.
 | 
						|
func getPersistentVolumeClaimName(set *apps.StatefulSet, claim *v1.PersistentVolumeClaim, ordinal int) string {
 | 
						|
	// NOTE: This name format is used by the heuristics for zone spreading in ChooseZoneForVolume
 | 
						|
	return fmt.Sprintf("%s-%s-%d", claim.Name, set.Name, ordinal)
 | 
						|
}
 | 
						|
 | 
						|
// isMemberOf tests if pod is a member of set.
 | 
						|
func isMemberOf(set *apps.StatefulSet, pod *v1.Pod) bool {
 | 
						|
	return getParentName(pod) == set.Name
 | 
						|
}
 | 
						|
 | 
						|
// identityMatches returns true if pod has a valid identity and network identity for a member of set.
 | 
						|
func identityMatches(set *apps.StatefulSet, pod *v1.Pod) bool {
 | 
						|
	parent, ordinal := getParentNameAndOrdinal(pod)
 | 
						|
	return ordinal >= 0 &&
 | 
						|
		set.Name == parent &&
 | 
						|
		pod.Name == getPodName(set, ordinal) &&
 | 
						|
		pod.Namespace == set.Namespace
 | 
						|
}
 | 
						|
 | 
						|
// storageMatches returns true if pod's Volumes cover the set of PersistentVolumeClaims
 | 
						|
func storageMatches(set *apps.StatefulSet, pod *v1.Pod) bool {
 | 
						|
	ordinal := getOrdinal(pod)
 | 
						|
	if ordinal < 0 {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	volumes := make(map[string]v1.Volume, len(pod.Spec.Volumes))
 | 
						|
	for _, volume := range pod.Spec.Volumes {
 | 
						|
		volumes[volume.Name] = volume
 | 
						|
	}
 | 
						|
	for _, claim := range set.Spec.VolumeClaimTemplates {
 | 
						|
		volume, found := volumes[claim.Name]
 | 
						|
		if !found ||
 | 
						|
			volume.VolumeSource.PersistentVolumeClaim == nil ||
 | 
						|
			volume.VolumeSource.PersistentVolumeClaim.ClaimName !=
 | 
						|
				getPersistentVolumeClaimName(set, &claim, ordinal) {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// getPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The
 | 
						|
// returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined
 | 
						|
// by getPersistentVolumeClaimName.
 | 
						|
func getPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) map[string]v1.PersistentVolumeClaim {
 | 
						|
	ordinal := getOrdinal(pod)
 | 
						|
	templates := set.Spec.VolumeClaimTemplates
 | 
						|
	claims := make(map[string]v1.PersistentVolumeClaim, len(templates))
 | 
						|
	for i := range templates {
 | 
						|
		claim := templates[i]
 | 
						|
		claim.Name = getPersistentVolumeClaimName(set, &claim, ordinal)
 | 
						|
		claim.Namespace = set.Namespace
 | 
						|
		claim.Labels = set.Spec.Selector.MatchLabels
 | 
						|
		claims[templates[i].Name] = claim
 | 
						|
	}
 | 
						|
	return claims
 | 
						|
}
 | 
						|
 | 
						|
// updateStorage updates pod's Volumes to conform with the PersistentVolumeClaim of set's templates. If pod has
 | 
						|
// conflicting local Volumes these are replaced with Volumes that conform to the set's templates.
 | 
						|
func updateStorage(set *apps.StatefulSet, pod *v1.Pod) {
 | 
						|
	currentVolumes := pod.Spec.Volumes
 | 
						|
	claims := getPersistentVolumeClaims(set, pod)
 | 
						|
	newVolumes := make([]v1.Volume, 0, len(claims))
 | 
						|
	for name, claim := range claims {
 | 
						|
		newVolumes = append(newVolumes, v1.Volume{
 | 
						|
			Name: name,
 | 
						|
			VolumeSource: v1.VolumeSource{
 | 
						|
				PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
 | 
						|
					ClaimName: claim.Name,
 | 
						|
					// TODO: Use source definition to set this value when we have one.
 | 
						|
					ReadOnly: false,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		})
 | 
						|
	}
 | 
						|
	for i := range currentVolumes {
 | 
						|
		if _, ok := claims[currentVolumes[i].Name]; !ok {
 | 
						|
			newVolumes = append(newVolumes, currentVolumes[i])
 | 
						|
		}
 | 
						|
	}
 | 
						|
	pod.Spec.Volumes = newVolumes
 | 
						|
}
 | 
						|
 | 
						|
func initIdentity(set *apps.StatefulSet, pod *v1.Pod) {
 | 
						|
	updateIdentity(set, pod)
 | 
						|
	// Set these immutable fields only on initial Pod creation, not updates.
 | 
						|
	pod.Spec.Hostname = pod.Name
 | 
						|
	pod.Spec.Subdomain = set.Spec.ServiceName
 | 
						|
}
 | 
						|
 | 
						|
// updateIdentity updates pod's name, hostname, and subdomain to conform to set's name and headless service.
 | 
						|
func updateIdentity(set *apps.StatefulSet, pod *v1.Pod) {
 | 
						|
	pod.Name = getPodName(set, getOrdinal(pod))
 | 
						|
	pod.Namespace = set.Namespace
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
// isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
 | 
						|
func isRunningAndReady(pod *v1.Pod) bool {
 | 
						|
	return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
 | 
						|
}
 | 
						|
 | 
						|
// isCreated returns true if pod has been created and is maintained by the API server
 | 
						|
func isCreated(pod *v1.Pod) bool {
 | 
						|
	return pod.Status.Phase != ""
 | 
						|
}
 | 
						|
 | 
						|
// isFailed returns true if pod has a Phase of PodFailed
 | 
						|
func isFailed(pod *v1.Pod) bool {
 | 
						|
	return pod.Status.Phase == v1.PodFailed
 | 
						|
}
 | 
						|
 | 
						|
// isTerminating returns true if pod's DeletionTimestamp has been set
 | 
						|
func isTerminating(pod *v1.Pod) bool {
 | 
						|
	return pod.DeletionTimestamp != nil
 | 
						|
}
 | 
						|
 | 
						|
// isHealthy returns true if pod is running and ready and has not been terminated
 | 
						|
func isHealthy(pod *v1.Pod) bool {
 | 
						|
	return isRunningAndReady(pod) && !isTerminating(pod)
 | 
						|
}
 | 
						|
 | 
						|
// allowsBurst is true if the alpha burst annotation is set.
 | 
						|
func allowsBurst(set *apps.StatefulSet) bool {
 | 
						|
	return set.Spec.PodManagementPolicy == apps.ParallelPodManagement
 | 
						|
}
 | 
						|
 | 
						|
// setPodRevision sets the revision of Pod to revision by adding the StatefulSetRevisionLabel
 | 
						|
func setPodRevision(pod *v1.Pod, revision string) {
 | 
						|
	if pod.Labels == nil {
 | 
						|
		pod.Labels = make(map[string]string)
 | 
						|
	}
 | 
						|
	pod.Labels[apps.StatefulSetRevisionLabel] = revision
 | 
						|
}
 | 
						|
 | 
						|
// getPodRevision gets the revision of Pod by inspecting the StatefulSetRevisionLabel. If pod has no revision the empty
 | 
						|
// string is returned.
 | 
						|
func getPodRevision(pod *v1.Pod) string {
 | 
						|
	if pod.Labels == nil {
 | 
						|
		return ""
 | 
						|
	}
 | 
						|
	return pod.Labels[apps.StatefulSetRevisionLabel]
 | 
						|
}
 | 
						|
 | 
						|
// newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal.
 | 
						|
func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
 | 
						|
	pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, metav1.NewControllerRef(set, controllerKind))
 | 
						|
	pod.Name = getPodName(set, ordinal)
 | 
						|
	initIdentity(set, pod)
 | 
						|
	updateStorage(set, pod)
 | 
						|
	return pod
 | 
						|
}
 | 
						|
 | 
						|
// newVersionedStatefulSetPod creates a new Pod for a StatefulSet. currentSet is the representation of the set at the
 | 
						|
// current revision. updateSet is the representation of the set at the updateRevision. currentRevision is the name of
 | 
						|
// the current revision. updateRevision is the name of the update revision. ordinal is the ordinal of the Pod. If the
 | 
						|
// returned error is nil, the returned Pod is valid.
 | 
						|
func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
 | 
						|
	if currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
 | 
						|
		(currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < int(currentSet.Status.CurrentReplicas)) ||
 | 
						|
		(currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition)) {
 | 
						|
		pod := newStatefulSetPod(currentSet, ordinal)
 | 
						|
		setPodRevision(pod, currentRevision)
 | 
						|
		return pod
 | 
						|
	}
 | 
						|
	pod := newStatefulSetPod(updateSet, ordinal)
 | 
						|
	setPodRevision(pod, updateRevision)
 | 
						|
	return pod
 | 
						|
}
 | 
						|
 | 
						|
// Match check if the given StatefulSet's template matches the template stored in the given history.
 | 
						|
func Match(ss *apps.StatefulSet, history *apps.ControllerRevision) (bool, error) {
 | 
						|
	patch, err := getPatch(ss)
 | 
						|
	if err != nil {
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
	return bytes.Equal(patch, history.Data.Raw), nil
 | 
						|
}
 | 
						|
 | 
						|
// getPatch returns a strategic merge patch that can be applied to restore a StatefulSet to a
 | 
						|
// previous version. If the returned error is nil the patch is valid. The current state that we save is just the
 | 
						|
// PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously
 | 
						|
// recorded patches.
 | 
						|
func getPatch(set *apps.StatefulSet) ([]byte, error) {
 | 
						|
	str, err := runtime.Encode(patchCodec, set)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	var raw map[string]interface{}
 | 
						|
	json.Unmarshal([]byte(str), &raw)
 | 
						|
	objCopy := make(map[string]interface{})
 | 
						|
	specCopy := make(map[string]interface{})
 | 
						|
	spec := raw["spec"].(map[string]interface{})
 | 
						|
	template := spec["template"].(map[string]interface{})
 | 
						|
	specCopy["template"] = template
 | 
						|
	template["$patch"] = "replace"
 | 
						|
	objCopy["spec"] = specCopy
 | 
						|
	patch, err := json.Marshal(objCopy)
 | 
						|
	return patch, err
 | 
						|
}
 | 
						|
 | 
						|
// newRevision creates a new ControllerRevision containing a patch that reapplies the target state of set.
 | 
						|
// The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned
 | 
						|
// ControllerRevision is valid. StatefulSet revisions are stored as patches that re-apply the current state of set
 | 
						|
// to a new StatefulSet using a strategic merge patch to replace the saved state of the new StatefulSet.
 | 
						|
func newRevision(set *apps.StatefulSet, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
 | 
						|
	patch, err := getPatch(set)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	cr, err := history.NewControllerRevision(set,
 | 
						|
		controllerKind,
 | 
						|
		selector,
 | 
						|
		runtime.RawExtension{Raw: patch},
 | 
						|
		revision,
 | 
						|
		collisionCount)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if cr.ObjectMeta.Annotations == nil {
 | 
						|
		cr.ObjectMeta.Annotations = make(map[string]string)
 | 
						|
	}
 | 
						|
	for key, value := range set.Annotations {
 | 
						|
		cr.ObjectMeta.Annotations[key] = value
 | 
						|
	}
 | 
						|
	return cr, nil
 | 
						|
}
 | 
						|
 | 
						|
// ApplyRevision returns a new StatefulSet constructed by restoring the state in revision to set. If the returned error
 | 
						|
// is nil, the returned StatefulSet is valid.
 | 
						|
func ApplyRevision(set *apps.StatefulSet, revision *apps.ControllerRevision) (*apps.StatefulSet, error) {
 | 
						|
	clone := set.DeepCopy()
 | 
						|
	patched, err := strategicpatch.StrategicMergePatch([]byte(runtime.EncodeOrDie(patchCodec, clone)), revision.Data.Raw, clone)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	err = json.Unmarshal(patched, clone)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return clone, nil
 | 
						|
}
 | 
						|
 | 
						|
// nextRevision finds the next valid revision number based on revisions. If the length of revisions
 | 
						|
// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
 | 
						|
// assumes that revisions has been sorted by Revision.
 | 
						|
func nextRevision(revisions []*apps.ControllerRevision) int64 {
 | 
						|
	count := len(revisions)
 | 
						|
	if count <= 0 {
 | 
						|
		return 1
 | 
						|
	}
 | 
						|
	return revisions[count-1].Revision + 1
 | 
						|
}
 | 
						|
 | 
						|
// inconsistentStatus returns true if the ObservedGeneration of status is greater than set's
 | 
						|
// Generation or if any of the status's fields do not match those of set's status.
 | 
						|
func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) bool {
 | 
						|
	return set.Status.ObservedGeneration == nil ||
 | 
						|
		*status.ObservedGeneration > *set.Status.ObservedGeneration ||
 | 
						|
		status.Replicas != set.Status.Replicas ||
 | 
						|
		status.CurrentReplicas != set.Status.CurrentReplicas ||
 | 
						|
		status.ReadyReplicas != set.Status.ReadyReplicas ||
 | 
						|
		status.UpdatedReplicas != set.Status.UpdatedReplicas ||
 | 
						|
		status.CurrentRevision != set.Status.CurrentRevision ||
 | 
						|
		status.UpdateRevision != set.Status.UpdateRevision
 | 
						|
}
 | 
						|
 | 
						|
// completeRollingUpdate completes a rolling update when all of set's replica Pods have been updated
 | 
						|
// to the updateRevision. status's currentRevision is set to updateRevision and its' updateRevision
 | 
						|
// is set to the empty string. status's currentReplicas is set to updateReplicas and its updateReplicas
 | 
						|
// are set to 0.
 | 
						|
func completeRollingUpdate(set *apps.StatefulSet, status *apps.StatefulSetStatus) {
 | 
						|
	if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
 | 
						|
		status.UpdatedReplicas == status.Replicas &&
 | 
						|
		status.ReadyReplicas == status.Replicas {
 | 
						|
		status.CurrentReplicas = status.UpdatedReplicas
 | 
						|
		status.CurrentRevision = status.UpdateRevision
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ascendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted
 | 
						|
// from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed
 | 
						|
// to the front of the list.
 | 
						|
type ascendingOrdinal []*v1.Pod
 | 
						|
 | 
						|
func (ao ascendingOrdinal) Len() int {
 | 
						|
	return len(ao)
 | 
						|
}
 | 
						|
 | 
						|
func (ao ascendingOrdinal) Swap(i, j int) {
 | 
						|
	ao[i], ao[j] = ao[j], ao[i]
 | 
						|
}
 | 
						|
 | 
						|
func (ao ascendingOrdinal) Less(i, j int) bool {
 | 
						|
	return getOrdinal(ao[i]) < getOrdinal(ao[j])
 | 
						|
}
 |