StatefulSet: Use ControllerRef to route watch events.

This is part of the completion of ControllerRef, as described here:

https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches
This commit is contained in:
Anthony Yeh
2017-02-24 12:56:10 -08:00
parent e4f67c8170
commit ea85a201c7
4 changed files with 248 additions and 111 deletions

View File

@@ -19,7 +19,6 @@ package statefulset
import (
"fmt"
"reflect"
"sort"
"time"
"k8s.io/apimachinery/pkg/api/errors"
@@ -50,6 +49,9 @@ const (
statefulSetResyncPeriod = 30 * time.Second
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
// StatefulSetController controls statefulsets.
type StatefulSetController struct {
// client interface
@@ -158,15 +160,36 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
func (ssc *StatefulSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
set := ssc.getStatefulSetForPod(pod)
if set == nil {
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
ssc.deletePod(pod)
return
}
ssc.enqueueStatefulSet(set)
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
ssc.enqueueStatefulSet(set)
return
}
// Otherwise, it's an orphan. Get a list of all matching controllers and sync
// them to see if anyone wants to adopt it.
for _, set := range ssc.getStatefulSetsForPod(pod) {
ssc.enqueueStatefulSet(set)
}
}
// updatePod adds the statefulset for the current and old pods to the sync queue.
// If the labels of the pod didn't change, this method enqueues a single statefulset.
func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
@@ -175,15 +198,41 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
// Two different versions of the same pod will always have different RVs.
return
}
set := ssc.getStatefulSetForPod(curPod)
if set == nil {
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
curControllerRef := controller.GetControllerOf(curPod)
oldControllerRef := controller.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged &&
oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind {
// The ControllerRef was changed. Sync the old controller, if any.
set, err := ssc.setLister.StatefulSets(oldPod.Namespace).Get(oldControllerRef.Name)
if err == nil {
ssc.enqueueStatefulSet(set)
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
if curControllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
set, err := ssc.setLister.StatefulSets(curPod.Namespace).Get(curControllerRef.Name)
if err != nil {
return
}
ssc.enqueueStatefulSet(set)
return
}
ssc.enqueueStatefulSet(set)
// TODO will we need this going forward with controller ref impl?
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
if oldSet := ssc.getStatefulSetForPod(oldPod); oldSet != nil {
ssc.enqueueStatefulSet(oldSet)
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
for _, set := range ssc.getStatefulSetsForPod(curPod) {
ssc.enqueueStatefulSet(set)
}
}
}
@@ -209,9 +258,22 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) {
}
}
glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
if set := ssc.getStatefulSetForPod(pod); set != nil {
ssc.enqueueStatefulSet(set)
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
ssc.enqueueStatefulSet(set)
}
// getPodsForStatefulSet returns the Pods that a given StatefulSet should manage.
@@ -232,12 +294,13 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, s
return isMemberOf(set, pod)
}
cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, getSSKind())
cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind)
return cm.ClaimPods(pods, filter)
}
// getStatefulSetForPod returns the StatefulSet managing the given pod.
func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet {
// getStatefulSetsForPod returns a list of StatefulSets that potentially match
// a given pod.
func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
sets, err := ssc.setLister.GetPodStatefulSets(pod)
if err != nil {
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
@@ -245,24 +308,14 @@ func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.Statef
}
// More than one set is selecting the same Pod
if len(sets) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(
fmt.Errorf(
"user error: more than one StatefulSet is selecting pods with labels: %+v",
pod.Labels))
// The timestamp sort should not be necessary because we will enforce the CreatedBy requirement by
// name
sort.Sort(overlappingStatefulSets(sets))
// return the first created set for which pod is a member
for i := range sets {
if isMemberOf(sets[i], pod) {
return sets[i]
}
}
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
return nil
}
return sets[0]
return sets
}
// enqueueStatefulSet enqueues the given statefulset in the work queue.