Switch endpoints controller to shared informers

This commit is contained in:
Andy Goldstein
2017-02-07 20:25:52 -05:00
parent e538adcd00
commit 3b8cc59214
6 changed files with 94 additions and 130 deletions

View File

@@ -25,11 +25,9 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1"
@@ -37,9 +35,8 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
utilpod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/util/metrics"
"github.com/golang/glog"
@@ -51,10 +48,6 @@ const (
// shorter amount of time before a mistaken endpoint is corrected.
FullServiceResyncPeriod = 30 * time.Second
// We must avoid syncing service until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// An annotation on the Service denoting if the endpoints controller should
// go ahead and create endpoints for unready pods. This annotation is
// currently only used by StatefulSets, where we need the pod to be DNS
@@ -73,7 +66,7 @@ var (
)
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(podInformer cache.SharedIndexInformer, client clientset.Interface) *EndpointController {
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController {
if client != nil && client.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter())
}
@@ -82,18 +75,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client clients
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
}
e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return e.client.Core().Services(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return e.client.Core().Services(metav1.NamespaceAll).Watch(options)
},
},
&v1.Service{},
// TODO: Can we have much longer period here?
FullServiceResyncPeriod,
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: e.enqueueService,
UpdateFunc: func(old, cur interface{}) {
@@ -101,26 +83,19 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client clients
},
DeleteFunc: e.enqueueService,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
// TODO: Can we have much longer period here?
FullServiceResyncPeriod,
)
e.serviceLister = serviceInformer.Lister()
e.servicesSynced = serviceInformer.Informer().HasSynced
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
})
e.podStore.Indexer = podInformer.GetIndexer()
e.podController = podInformer.GetController()
e.podStoreSynced = podInformer.HasSynced
return e
}
// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
podInformer := informers.NewPodInformer(client, resyncPeriod())
e := NewEndpointController(podInformer, client)
e.internalPodInformer = podInformer
e.podLister = podInformer.Lister()
e.podsSynced = podInformer.Informer().HasSynced
return e
}
@@ -129,15 +104,19 @@ func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod c
type EndpointController struct {
client clientset.Interface
serviceStore listers.StoreToServiceLister
podStore listers.StoreToPodLister
// serviceLister is able to list/get services and is populated by the shared informer passed to
// NewEndpointController.
serviceLister corelisters.ServiceLister
// servicesSynced returns true if the service shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
servicesSynced cache.InformerSynced
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewEndpointController(passing SharedInformer), this
// will be null
internalPodInformer cache.SharedIndexInformer
// podLister is able to list/get pods and is populated by the shared informer passed to
// NewEndpointController.
podLister corelisters.PodLister
// podsSynced returns true if the pod shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podsSynced cache.InformerSynced
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
@@ -145,14 +124,6 @@ type EndpointController struct {
// service that's inserted multiple times to be processed more than
// necessary.
queue workqueue.RateLimitingInterface
// Since we join two objects, we'll watch both of them with
// controllers.
serviceController cache.Controller
podController cache.Controller
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
}
// Runs e; will not return until stopCh is closed. workers determines how many
@@ -161,10 +132,8 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer e.queue.ShutDown()
go e.serviceController.Run(stopCh)
go e.podController.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, e.podStoreSynced) {
if !cache.WaitForCacheSync(stopCh, e.podsSynced, e.servicesSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
@@ -177,16 +146,12 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
e.checkLeftoverEndpoints()
}()
if e.internalPodInformer != nil {
go e.internalPodInformer.Run(stopCh)
}
<-stopCh
}
func (e *EndpointController) getPodServiceMemberships(pod *v1.Pod) (sets.String, error) {
set := sets.String{}
services, err := e.serviceStore.GetPodServices(pod)
services, err := e.serviceLister.GetPodServices(pod)
if err != nil {
// don't log this error because this function makes pointless
// errors when no services match.
@@ -338,8 +303,12 @@ func (e *EndpointController) syncService(key string) error {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := e.serviceStore.Indexer.GetByKey(key)
if err != nil || !exists {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
service, err := e.serviceLister.Services(namespace).Get(name)
if err != nil {
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
@@ -358,7 +327,6 @@ func (e *EndpointController) syncService(key string) error {
return nil
}
service := obj.(*v1.Service)
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
@@ -366,7 +334,7 @@ func (e *EndpointController) syncService(key string) error {
}
glog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.