Switch statefulset controller to shared informers

This commit is contained in:
Andy Goldstein
2017-02-14 14:03:00 -05:00
parent eef16cf141
commit f6a186b1e1
13 changed files with 520 additions and 313 deletions

View File

@@ -22,23 +22,23 @@ import (
"sort"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/workqueue"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
@@ -56,20 +56,24 @@ type StatefulSetController struct {
// control returns an interface capable of syncing a stateful set.
// Abstracted out for testing.
control StatefulSetControlInterface
// podStore is a cache of watched pods.
podStore listers.StoreToPodLister
// podStoreSynced returns true if the pod store has synced at least once.
podStoreSynced cache.InformerSynced
// A store of StatefulSets, populated by setController.
setStore listers.StoreToStatefulSetLister
// Watches changes to all StatefulSets.
setController cache.Controller
// podLister is able to list/get pods from a shared informer's store
podLister corelisters.PodLister
// podListerSynced returns true if the pod shared informer has synced at least once
podListerSynced cache.InformerSynced
// setLister is able to list/get stateful sets from a shared informer's store
setLister appslisters.StatefulSetLister
// setListerSynced returns true if the stateful set shared informer has synced at least once
setListerSynced cache.InformerSynced
// StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface
}
// NewStatefulSetController creates a new statefulset controller.
func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod time.Duration) *StatefulSetController {
func NewStatefulSetController(
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
@@ -77,11 +81,11 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(NewRealStatefulPodControl(kubeClient, recorder)),
control: NewDefaultStatefulSetControl(NewRealStatefulPodControl(kubeClient, setInformer.Lister(), podInformer.Lister(), recorder)),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
}
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// lookup the statefulset and enqueue
AddFunc: ssc.addPod,
// lookup current and old statefulset if labels changed
@@ -89,19 +93,10 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
// lookup statefulset accounting for deletion tombstones
DeleteFunc: ssc.deletePod,
})
ssc.podStore.Indexer = podInformer.GetIndexer()
ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced
ssc.setStore.Store, ssc.setController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ssc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ssc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).Watch(options)
},
},
&apps.StatefulSet{},
statefulSetResyncPeriod,
setInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: ssc.enqueueStatefulSet,
UpdateFunc: func(old, cur interface{}) {
@@ -114,9 +109,12 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
},
DeleteFunc: ssc.enqueueStatefulSet,
},
statefulSetResyncPeriod,
)
ssc.setLister = setInformer.Lister()
ssc.setListerSynced = setInformer.Informer().HasSynced
// TODO: Watch volumes
ssc.podStoreSynced = podInformer.GetController().HasSynced
return ssc
}
@@ -124,18 +122,20 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ssc.queue.ShutDown()
glog.Infof("Starting statefulset controller")
if !cache.WaitForCacheSync(stopCh, ssc.podStoreSynced) {
if !cache.WaitForCacheSync(stopCh, ssc.podListerSynced, ssc.setListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
go ssc.setController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(ssc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down statefulset controller")
}
// addPod adds the statefulset for the pod to the sync queue
@@ -204,12 +204,12 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) (
if err != nil {
return []*v1.Pod{}, err
}
return ssc.podStore.Pods(set.Namespace).List(sel)
return ssc.podLister.Pods(set.Namespace).List(sel)
}
// getStatefulSetForPod returns the StatefulSet managing the given pod.
func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet {
sets, err := ssc.setStore.GetPodStatefulSets(pod)
sets, err := ssc.setLister.GetPodStatefulSets(pod)
if err != nil {
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
return nil
@@ -225,14 +225,14 @@ func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.Statef
sort.Sort(overlappingStatefulSets(sets))
// return the first created set for which pod is a member
for i := range sets {
if isMemberOf(&sets[i], pod) {
return &sets[i]
if isMemberOf(sets[i], pod) {
return sets[i]
}
}
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
return nil
}
return &sets[0]
return sets[0]
}
@@ -276,8 +276,12 @@ func (ssc *StatefulSetController) sync(key string) error {
glog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := ssc.setStore.Store.GetByKey(key)
if !exists {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.Infof("StatefulSet has been deleted %v", key)
return nil
}
@@ -286,13 +290,12 @@ func (ssc *StatefulSetController) sync(key string) error {
return err
}
set := *obj.(*apps.StatefulSet)
pods, err := ssc.getPodsForStatefulSet(&set)
pods, err := ssc.getPodsForStatefulSet(set)
if err != nil {
return err
}
return ssc.syncStatefulSet(&set, pods)
return ssc.syncStatefulSet(set, pods)
}
// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).