mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			239 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			239 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package statefulset
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"strings"
 | 
						|
 | 
						|
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						|
	errorutils "k8s.io/apimachinery/pkg/util/errors"
 | 
						|
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
	"k8s.io/client-go/pkg/api"
 | 
						|
	"k8s.io/client-go/tools/record"
 | 
						|
	"k8s.io/kubernetes/pkg/api/v1"
 | 
						|
	apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
 | 
						|
	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | 
						|
	appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
 | 
						|
	corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
 | 
						|
	"k8s.io/kubernetes/pkg/client/retry"
 | 
						|
)
 | 
						|
 | 
						|
// StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods,
 | 
						|
// and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its
 | 
						|
// implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement.
 | 
						|
// Like controller.PodControlInterface, it is implemented as an interface to provide for testing fakes.
 | 
						|
type StatefulPodControlInterface interface {
 | 
						|
	// CreateStatefulPod create a Pod in a StatefulSet. Any PVCs necessary for the Pod are created prior to creating
 | 
						|
	// the Pod. If the returned error is nil the Pod and its PVCs have been created.
 | 
						|
	CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
 | 
						|
	// UpdateStatefulPod Updates a Pod in a StatefulSet. If the Pod already has the correct identity and stable
 | 
						|
	// storage this method is a no-op. If the Pod must be mutated to conform to the Set, it is mutated and updated.
 | 
						|
	// pod is an in-out parameter, and any updates made to the pod are reflected as mutations to this parameter. If
 | 
						|
	// the create is successful, the returned error is nil.
 | 
						|
	UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
 | 
						|
	// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
 | 
						|
	// the returned error is nil.
 | 
						|
	DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
 | 
						|
	// UpdateStatefulSetStatus updates the status of a StatefulSet. set is an in-out parameter, and any
 | 
						|
	// updates made to the set are made visible as mutations to the parameter. If the method is successful, the
 | 
						|
	// returned error is nil, and set has its status updated.
 | 
						|
	UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error
 | 
						|
}
 | 
						|
 | 
						|
func NewRealStatefulPodControl(
 | 
						|
	client clientset.Interface,
 | 
						|
	setLister appslisters.StatefulSetLister,
 | 
						|
	podLister corelisters.PodLister,
 | 
						|
	pvcLister corelisters.PersistentVolumeClaimLister,
 | 
						|
	recorder record.EventRecorder,
 | 
						|
) StatefulPodControlInterface {
 | 
						|
	return &realStatefulPodControl{client, setLister, podLister, pvcLister, recorder}
 | 
						|
}
 | 
						|
 | 
						|
// realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
 | 
						|
// API server. The struct is package private as the internal details are irrelevant to importing packages.
 | 
						|
type realStatefulPodControl struct {
 | 
						|
	client    clientset.Interface
 | 
						|
	setLister appslisters.StatefulSetLister
 | 
						|
	podLister corelisters.PodLister
 | 
						|
	pvcLister corelisters.PersistentVolumeClaimLister
 | 
						|
	recorder  record.EventRecorder
 | 
						|
}
 | 
						|
 | 
						|
func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
 | 
						|
	// Create the Pod's PVCs prior to creating the Pod
 | 
						|
	if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
 | 
						|
		spc.recordPodEvent("create", set, pod, err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	// If we created the PVCs attempt to create the Pod
 | 
						|
	_, err := spc.client.Core().Pods(set.Namespace).Create(pod)
 | 
						|
	// sink already exists errors
 | 
						|
	if apierrors.IsAlreadyExists(err) {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	spc.recordPodEvent("create", set, pod, err)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
 | 
						|
	attemptedUpdate := false
 | 
						|
 | 
						|
	err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
 | 
						|
		// assume the Pod is consistent
 | 
						|
		consistent := true
 | 
						|
		// if the Pod does not conform to its identity, update the identity and dirty the Pod
 | 
						|
		if !identityMatches(set, pod) {
 | 
						|
			updateIdentity(set, pod)
 | 
						|
			consistent = false
 | 
						|
		}
 | 
						|
		// if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
 | 
						|
		// dirty the Pod, and create any missing PVCs
 | 
						|
		if !storageMatches(set, pod) {
 | 
						|
			updateStorage(set, pod)
 | 
						|
			consistent = false
 | 
						|
			if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
 | 
						|
				spc.recordPodEvent("update", set, pod, err)
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
		// if the Pod is not dirty do nothing
 | 
						|
		if consistent {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		attemptedUpdate = true
 | 
						|
		// commit the update, retrying on conflicts
 | 
						|
		_, err := spc.client.Core().Pods(set.Namespace).Update(pod)
 | 
						|
		if err == nil {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		updateErr := err
 | 
						|
 | 
						|
		if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil {
 | 
						|
			// make a copy so we don't mutate the shared cache
 | 
						|
			if copy, err := api.Scheme.DeepCopy(updated); err == nil {
 | 
						|
				pod = copy.(*v1.Pod)
 | 
						|
			} else {
 | 
						|
				utilruntime.HandleError(fmt.Errorf("error copying updated Pod: %v", err))
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err))
 | 
						|
		}
 | 
						|
 | 
						|
		return updateErr
 | 
						|
	})
 | 
						|
	if attemptedUpdate {
 | 
						|
		spc.recordPodEvent("update", set, pod, err)
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
 | 
						|
	err := spc.client.Core().Pods(set.Namespace).Delete(pod.Name, nil)
 | 
						|
	spc.recordPodEvent("delete", set, pod, err)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (spc *realStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error {
 | 
						|
	return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
 | 
						|
		set.Status.Replicas = replicas
 | 
						|
		set.Status.ObservedGeneration = &generation
 | 
						|
		_, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
 | 
						|
		if err == nil {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		updateErr := err
 | 
						|
 | 
						|
		if updated, err := spc.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil {
 | 
						|
			// make a copy so we don't mutate the shared cache
 | 
						|
			if copy, err := api.Scheme.DeepCopy(updated); err == nil {
 | 
						|
				set = copy.(*apps.StatefulSet)
 | 
						|
			} else {
 | 
						|
				utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err))
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err))
 | 
						|
		}
 | 
						|
 | 
						|
		return updateErr
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
 | 
						|
// have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
 | 
						|
func (spc *realStatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
 | 
						|
	if err == nil {
 | 
						|
		reason := fmt.Sprintf("Successful%s", strings.Title(verb))
 | 
						|
		message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
 | 
						|
			strings.ToLower(verb), pod.Name, set.Name)
 | 
						|
		spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
 | 
						|
	} else {
 | 
						|
		reason := fmt.Sprintf("Failed%s", strings.Title(verb))
 | 
						|
		message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s",
 | 
						|
			strings.ToLower(verb), pod.Name, set.Name, err)
 | 
						|
		spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is
 | 
						|
// nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a
 | 
						|
// reason of v1.EventTypeWarning.
 | 
						|
func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
 | 
						|
	if err == nil {
 | 
						|
		reason := fmt.Sprintf("Successful%s", strings.Title(verb))
 | 
						|
		message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
 | 
						|
			strings.ToLower(verb), claim.Name, pod.Name, set.Name)
 | 
						|
		spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
 | 
						|
	} else {
 | 
						|
		reason := fmt.Sprintf("Failed%s", strings.Title(verb))
 | 
						|
		message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s",
 | 
						|
			strings.ToLower(verb), claim.Name, pod.Name, set.Name, err)
 | 
						|
		spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which mush be a member of
 | 
						|
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
 | 
						|
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
 | 
						|
// set's Spec.
 | 
						|
func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
 | 
						|
	var errs []error
 | 
						|
	for _, claim := range getPersistentVolumeClaims(set, pod) {
 | 
						|
		_, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
 | 
						|
		switch {
 | 
						|
		case apierrors.IsNotFound(err):
 | 
						|
			_, err := spc.client.Core().PersistentVolumeClaims(claim.Namespace).Create(&claim)
 | 
						|
			if err != nil {
 | 
						|
				errs = append(errs, fmt.Errorf("Failed to create PVC %s: %s", claim.Name, err))
 | 
						|
			}
 | 
						|
			if err == nil || !apierrors.IsAlreadyExists(err) {
 | 
						|
				spc.recordClaimEvent("create", set, pod, &claim, err)
 | 
						|
			}
 | 
						|
		case err != nil:
 | 
						|
			errs = append(errs, fmt.Errorf("Failed to retrieve PVC %s: %s", claim.Name, err))
 | 
						|
			spc.recordClaimEvent("create", set, pod, &claim, err)
 | 
						|
		}
 | 
						|
		// TODO: Check resource requirements and accessmodes, update if necessary
 | 
						|
	}
 | 
						|
	return errorutils.NewAggregate(errs)
 | 
						|
}
 | 
						|
 | 
						|
var _ StatefulPodControlInterface = &realStatefulPodControl{}
 |