mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-11-01 10:48:15 +00:00
StatefulSet refactoring and semantics fix
1. pcb and pcb controller are removed and their functionality is encapsulated in StatefulPodControlInterface. 2. IdentityMappers has been removed to clarify what properties of a Pod are mutated by the controller. All mutations are performed in the UpdateStatefulPod method of the StatefulPodControlInterface. 3. The statefulSetIterator and petQueue classes are removed. These classes sorted Pods by CreationTimestamp. This is brittle and not resilient to clock skew. The current control loop, which implements the same logic, is in stateful_set_control.go. The Pods are now sorted and considered by their ordinal indices, as is outlined in the documentation. 4. StatefulSetController now checks to see if the Pods matching a StatefulSet's Selector also match the Name of the StatefulSet. This will make the controller resilient to overlapping, and will be enhanced by the addition of ControllerRefs.
This commit is contained in:
@@ -27,6 +27,7 @@ import (
|
||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
||||
@@ -34,7 +35,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/legacylisters"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
@@ -47,42 +47,27 @@ import (
|
||||
const (
|
||||
// Time to sleep before polling to see if the pod cache has synced.
|
||||
PodStoreSyncedPollPeriod = 100 * time.Millisecond
|
||||
// number of retries for a status update.
|
||||
statusUpdateRetries = 2
|
||||
// period to relist statefulsets and verify pets
|
||||
statefulSetResyncPeriod = 30 * time.Second
|
||||
)
|
||||
|
||||
// StatefulSetController controls statefulsets.
|
||||
type StatefulSetController struct {
|
||||
// client interface
|
||||
kubeClient clientset.Interface
|
||||
|
||||
// newSyncer returns an interface capable of syncing a single pet.
|
||||
// Abstracted out for testing.
|
||||
newSyncer func(*pcb) *petSyncer
|
||||
|
||||
control StatefulSetControlInterface
|
||||
// podStore is a cache of watched pods.
|
||||
podStore listers.StoreToPodLister
|
||||
|
||||
// podStoreSynced returns true if the pod store has synced at least once.
|
||||
podStoreSynced func() bool
|
||||
// Watches changes to all pods.
|
||||
podController cache.Controller
|
||||
|
||||
podStoreSynced cache.InformerSynced
|
||||
// A store of StatefulSets, populated by the psController.
|
||||
psStore listers.StoreToStatefulSetLister
|
||||
setStore listers.StoreToStatefulSetLister
|
||||
// Watches changes to all StatefulSets.
|
||||
psController cache.Controller
|
||||
|
||||
// A store of the 1 unhealthy pet blocking progress for a given ps
|
||||
blockingPetStore *unhealthyPetTracker
|
||||
|
||||
setController cache.Controller
|
||||
// Controllers that need to be synced.
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
// syncHandler handles sync events for statefulsets.
|
||||
// Abstracted as a func to allow injection for testing.
|
||||
syncHandler func(psKey string) error
|
||||
}
|
||||
|
||||
// NewStatefulSetController creates a new statefulset controller.
|
||||
@@ -91,86 +76,83 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "statefulset"})
|
||||
pc := &apiServerPetClient{kubeClient, recorder, &defaultPetHealthChecker{}}
|
||||
|
||||
psc := &StatefulSetController{
|
||||
kubeClient: kubeClient,
|
||||
blockingPetStore: newUnHealthyPetTracker(pc),
|
||||
newSyncer: func(blockingPet *pcb) *petSyncer {
|
||||
return &petSyncer{pc, blockingPet}
|
||||
},
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
|
||||
ssc := &StatefulSetController{
|
||||
kubeClient: kubeClient,
|
||||
control: NewDefaultStatefulSetControl(NewRealStatefulPodControl(kubeClient, recorder)),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
|
||||
}
|
||||
|
||||
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
// lookup the statefulset and enqueue
|
||||
AddFunc: psc.addPod,
|
||||
AddFunc: ssc.addPod,
|
||||
// lookup current and old statefulset if labels changed
|
||||
UpdateFunc: psc.updatePod,
|
||||
UpdateFunc: ssc.updatePod,
|
||||
// lookup statefulset accounting for deletion tombstones
|
||||
DeleteFunc: psc.deletePod,
|
||||
DeleteFunc: ssc.deletePod,
|
||||
})
|
||||
psc.podStore.Indexer = podInformer.GetIndexer()
|
||||
psc.podController = podInformer.GetController()
|
||||
ssc.podStore.Indexer = podInformer.GetIndexer()
|
||||
|
||||
psc.psStore.Store, psc.psController = cache.NewInformer(
|
||||
ssc.setStore.Store, ssc.setController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return psc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).List(options)
|
||||
return ssc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
return psc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).Watch(options)
|
||||
return ssc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).Watch(options)
|
||||
},
|
||||
},
|
||||
&apps.StatefulSet{},
|
||||
statefulSetResyncPeriod,
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: psc.enqueueStatefulSet,
|
||||
AddFunc: ssc.enqueueStatefulSet,
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldPS := old.(*apps.StatefulSet)
|
||||
curPS := cur.(*apps.StatefulSet)
|
||||
if oldPS.Status.Replicas != curPS.Status.Replicas {
|
||||
glog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
|
||||
}
|
||||
psc.enqueueStatefulSet(cur)
|
||||
ssc.enqueueStatefulSet(cur)
|
||||
},
|
||||
DeleteFunc: psc.enqueueStatefulSet,
|
||||
DeleteFunc: ssc.enqueueStatefulSet,
|
||||
},
|
||||
)
|
||||
// TODO: Watch volumes
|
||||
psc.podStoreSynced = psc.podController.HasSynced
|
||||
psc.syncHandler = psc.Sync
|
||||
return psc
|
||||
ssc.podStoreSynced = podInformer.GetController().HasSynced
|
||||
return ssc
|
||||
}
|
||||
|
||||
// Run runs the statefulset controller.
|
||||
func (psc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
|
||||
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer ssc.queue.ShutDown()
|
||||
glog.Infof("Starting statefulset controller")
|
||||
go psc.podController.Run(stopCh)
|
||||
go psc.psController.Run(stopCh)
|
||||
if !cache.WaitForCacheSync(stopCh, ssc.podStoreSynced) {
|
||||
return
|
||||
}
|
||||
go ssc.setController.Run(stopCh)
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(psc.worker, time.Second, stopCh)
|
||||
go wait.Until(ssc.worker, time.Second, stopCh)
|
||||
}
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down statefulset controller")
|
||||
psc.queue.ShutDown()
|
||||
|
||||
}
|
||||
|
||||
// addPod adds the statefulset for the pod to the sync queue
|
||||
func (psc *StatefulSetController) addPod(obj interface{}) {
|
||||
func (ssc *StatefulSetController) addPod(obj interface{}) {
|
||||
pod := obj.(*v1.Pod)
|
||||
glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
|
||||
ps := psc.getStatefulSetForPod(pod)
|
||||
if ps == nil {
|
||||
set := ssc.getStatefulSetForPod(pod)
|
||||
if set == nil {
|
||||
return
|
||||
}
|
||||
psc.enqueueStatefulSet(ps)
|
||||
ssc.enqueueStatefulSet(set)
|
||||
}
|
||||
|
||||
// updatePod adds the statefulset for the current and old pods to the sync queue.
|
||||
// If the labels of the pod didn't change, this method enqueues a single statefulset.
|
||||
func (psc *StatefulSetController) updatePod(old, cur interface{}) {
|
||||
func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
|
||||
curPod := cur.(*v1.Pod)
|
||||
oldPod := old.(*v1.Pod)
|
||||
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||
@@ -178,20 +160,21 @@ func (psc *StatefulSetController) updatePod(old, cur interface{}) {
|
||||
// Two different versions of the same pod will always have different RVs.
|
||||
return
|
||||
}
|
||||
ps := psc.getStatefulSetForPod(curPod)
|
||||
if ps == nil {
|
||||
set := ssc.getStatefulSetForPod(curPod)
|
||||
if set == nil {
|
||||
return
|
||||
}
|
||||
psc.enqueueStatefulSet(ps)
|
||||
ssc.enqueueStatefulSet(set)
|
||||
// TODO will we need this going forward with controller ref impl?
|
||||
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
|
||||
if oldPS := psc.getStatefulSetForPod(oldPod); oldPS != nil {
|
||||
psc.enqueueStatefulSet(oldPS)
|
||||
if oldSet := ssc.getStatefulSetForPod(oldPod); oldSet != nil {
|
||||
ssc.enqueueStatefulSet(oldSet)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// deletePod enqueues the statefulset for the pod accounting for deletion tombstones.
|
||||
func (psc *StatefulSetController) deletePod(obj interface{}) {
|
||||
func (ssc *StatefulSetController) deletePod(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
@@ -201,173 +184,126 @@ func (psc *StatefulSetController) deletePod(obj interface{}) {
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %+v", obj)
|
||||
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
glog.Errorf("tombstone contained object that is not a pod %+v", obj)
|
||||
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
|
||||
return
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
|
||||
if ps := psc.getStatefulSetForPod(pod); ps != nil {
|
||||
psc.enqueueStatefulSet(ps)
|
||||
if set := ssc.getStatefulSetForPod(pod); set != nil {
|
||||
ssc.enqueueStatefulSet(set)
|
||||
}
|
||||
}
|
||||
|
||||
// getPodsForStatefulSets returns the pods that match the selectors of the given statefulset.
|
||||
func (psc *StatefulSetController) getPodsForStatefulSet(ps *apps.StatefulSet) ([]*v1.Pod, error) {
|
||||
// TODO: Do we want the statefulset to fight with RCs? check parent statefulset annotation, or name prefix?
|
||||
sel, err := metav1.LabelSelectorAsSelector(ps.Spec.Selector)
|
||||
func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) ([]*v1.Pod, error) {
|
||||
sel, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||
if err != nil {
|
||||
return []*v1.Pod{}, err
|
||||
}
|
||||
pods, err := psc.podStore.Pods(ps.Namespace).List(sel)
|
||||
if err != nil {
|
||||
return []*v1.Pod{}, err
|
||||
}
|
||||
// TODO: Do we need to copy?
|
||||
result := make([]*v1.Pod, 0, len(pods))
|
||||
for i := range pods {
|
||||
result = append(result, &(*pods[i]))
|
||||
}
|
||||
return result, nil
|
||||
return ssc.podStore.Pods(set.Namespace).List(sel)
|
||||
}
|
||||
|
||||
// getStatefulSetForPod returns the pet set managing the given pod.
|
||||
func (psc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet {
|
||||
ps, err := psc.psStore.GetPodStatefulSets(pod)
|
||||
// getStatefulSetForPod returns the StatefulSet managing the given pod.
|
||||
func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet {
|
||||
sets, err := ssc.setStore.GetPodStatefulSets(pod)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
|
||||
return nil
|
||||
}
|
||||
// Resolve a overlapping statefulset tie by creation timestamp.
|
||||
// Let's hope users don't create overlapping statefulsets.
|
||||
if len(ps) > 1 {
|
||||
glog.Errorf("user error! more than one StatefulSet is selecting pods with labels: %+v", pod.Labels)
|
||||
sort.Sort(overlappingStatefulSets(ps))
|
||||
// More than one set is selecting the same Pod
|
||||
if len(sets) > 1 {
|
||||
utilruntime.HandleError(
|
||||
fmt.Errorf(
|
||||
"user error: more than one StatefulSet is selecting pods with labels: %+v",
|
||||
pod.Labels))
|
||||
// The timestamp sort should not be necessary because we will enforce the CreatedBy requirement by
|
||||
// name
|
||||
sort.Sort(overlappingStatefulSets(sets))
|
||||
// return the first created set for which pod is a member
|
||||
for i := range sets {
|
||||
if isMemberOf(&sets[i], pod) {
|
||||
return &sets[i]
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
|
||||
return nil
|
||||
}
|
||||
return &ps[0]
|
||||
return &sets[0]
|
||||
|
||||
}
|
||||
|
||||
// enqueueStatefulSet enqueues the given statefulset in the work queue.
|
||||
func (psc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
|
||||
func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Cound't get key for object %+v: %v", obj, err)
|
||||
utilruntime.HandleError(fmt.Errorf("Cound't get key for object %+v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
psc.queue.Add(key)
|
||||
ssc.queue.Add(key)
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (psc *StatefulSetController) worker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := psc.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer psc.queue.Done(key)
|
||||
if err := psc.syncHandler(key.(string)); err != nil {
|
||||
glog.Errorf("Error syncing StatefulSet %v, requeuing: %v", key.(string), err)
|
||||
psc.queue.AddRateLimited(key)
|
||||
} else {
|
||||
psc.queue.Forget(key)
|
||||
}
|
||||
}()
|
||||
// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
|
||||
// invoked concurrently with the same key.
|
||||
func (ssc *StatefulSetController) processNextWorkItem() bool {
|
||||
key, quit := ssc.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer ssc.queue.Done(key)
|
||||
if err := ssc.sync(key.(string)); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error syncing StatefulSet %v, requeuing: %v", key.(string), err))
|
||||
ssc.queue.AddRateLimited(key)
|
||||
} else {
|
||||
ssc.queue.Forget(key)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// worker runs a worker goroutine that invokes processNextWorkItem until the the controller's queue is closed
|
||||
func (ssc *StatefulSetController) worker() {
|
||||
for ssc.processNextWorkItem() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Sync syncs the given statefulset.
|
||||
func (psc *StatefulSetController) Sync(key string) error {
|
||||
// sync syncs the given statefulset.
|
||||
func (ssc *StatefulSetController) sync(key string) error {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
if !psc.podStoreSynced() {
|
||||
// Sleep so we give the pod reflector goroutine a chance to run.
|
||||
time.Sleep(PodStoreSyncedPollPeriod)
|
||||
return fmt.Errorf("waiting for pods controller to sync")
|
||||
}
|
||||
|
||||
obj, exists, err := psc.psStore.Store.GetByKey(key)
|
||||
obj, exists, err := ssc.setStore.Store.GetByKey(key)
|
||||
if !exists {
|
||||
if err = psc.blockingPetStore.store.Delete(key); err != nil {
|
||||
return err
|
||||
}
|
||||
glog.Infof("StatefulSet has been deleted %v", key)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to retrieve StatefulSet %v from store: %v", key, err)
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to retrieve StatefulSet %v from store: %v", key, err))
|
||||
return err
|
||||
}
|
||||
|
||||
ps := *obj.(*apps.StatefulSet)
|
||||
petList, err := psc.getPodsForStatefulSet(&ps)
|
||||
set := *obj.(*apps.StatefulSet)
|
||||
pods, err := ssc.getPodsForStatefulSet(&set)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
numPets, syncErr := psc.syncStatefulSet(&ps, petList)
|
||||
if updateErr := updatePetCount(psc.kubeClient.Apps(), ps, numPets); updateErr != nil {
|
||||
glog.Infof("Failed to update replica count for statefulset %v/%v; requeuing; error: %v", ps.Namespace, ps.Name, updateErr)
|
||||
return errors.NewAggregate([]error{syncErr, updateErr})
|
||||
}
|
||||
|
||||
return syncErr
|
||||
return ssc.syncStatefulSet(&set, pods)
|
||||
}
|
||||
|
||||
// syncStatefulSet syncs a tuple of (statefulset, pets).
|
||||
func (psc *StatefulSetController) syncStatefulSet(ps *apps.StatefulSet, pets []*v1.Pod) (int, error) {
|
||||
glog.V(2).Infof("Syncing StatefulSet %v/%v with %d pods", ps.Namespace, ps.Name, len(pets))
|
||||
|
||||
it := NewStatefulSetIterator(ps, pets)
|
||||
blockingPet, err := psc.blockingPetStore.Get(ps, pets)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
|
||||
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
|
||||
glog.V(2).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
|
||||
if err := ssc.control.UpdateStatefulSet(set, pods); err != nil {
|
||||
glog.V(2).Infof("Error syncing StatefulSet %s/%s with %d pods : %s", set.Namespace, set.Name, err)
|
||||
return err
|
||||
}
|
||||
if blockingPet != nil {
|
||||
glog.Infof("StatefulSet %v blocked from scaling on pod %v", ps.Name, blockingPet.pod.Name)
|
||||
}
|
||||
petManager := psc.newSyncer(blockingPet)
|
||||
numPets := 0
|
||||
|
||||
for it.Next() {
|
||||
pet := it.Value()
|
||||
if pet == nil {
|
||||
continue
|
||||
}
|
||||
switch pet.event {
|
||||
case syncPet:
|
||||
err = petManager.Sync(pet)
|
||||
if err == nil {
|
||||
numPets++
|
||||
}
|
||||
case deletePet:
|
||||
err = petManager.Delete(pet)
|
||||
}
|
||||
switch err.(type) {
|
||||
case errUnhealthyPet:
|
||||
// We are not passing this error up, but we don't increment numPets if we encounter it,
|
||||
// since numPets directly translates to statefulset.status.replicas
|
||||
continue
|
||||
case nil:
|
||||
continue
|
||||
default:
|
||||
it.errs = append(it.errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := psc.blockingPetStore.Add(petManager.blockingPet); err != nil {
|
||||
it.errs = append(it.errs, err)
|
||||
}
|
||||
// TODO: GC pvcs. We can't delete them per pet because of grace period, and
|
||||
// in fact we *don't want to* till statefulset is stable to guarantee that bugs
|
||||
// in the controller don't corrupt user data.
|
||||
return numPets, errors.NewAggregate(it.errs)
|
||||
glog.V(2).Infof("Succesfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user