move informer and controller to pkg/client/cache

Signed-off-by: Mike Danese <mikedanese@google.com>
This commit is contained in:
Mike Danese
2016-09-14 11:35:38 -07:00
parent 0a62dab566
commit a765d59932
78 changed files with 425 additions and 498 deletions

View File

@@ -23,8 +23,6 @@ import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
@@ -34,8 +32,7 @@ import (
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
@@ -46,6 +43,8 @@ import (
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
)
const (
@@ -72,7 +71,7 @@ type DaemonSetsController struct {
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
internalPodInformer cache.SharedInformer
// An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
@@ -89,17 +88,17 @@ type DaemonSetsController struct {
// A store of nodes
nodeStore cache.StoreToNodeLister
// Watches changes to all daemon sets.
dsController *framework.Controller
dsController *cache.Controller
// Watches changes to all pods
podController framework.ControllerInterface
podController cache.ControllerInterface
// Watches changes to all nodes.
nodeController *framework.Controller
nodeController *cache.Controller
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced framework.InformerSynced
podStoreSynced cache.InformerSynced
// nodeStoreSynced returns true if the node store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
nodeStoreSynced framework.InformerSynced
nodeStoreSynced cache.InformerSynced
lookupCache *controller.MatchingCache
@@ -107,7 +106,7 @@ type DaemonSetsController struct {
queue workqueue.RateLimitingInterface
}
func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
func NewDaemonSetsController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
@@ -128,7 +127,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
}
// Manage addition/update of daemon sets.
dsc.dsStore.Store, dsc.dsController = framework.NewInformer(
dsc.dsStore.Store, dsc.dsController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(options)
@@ -140,7 +139,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
&extensions.DaemonSet{},
// TODO: Can we have much longer period here?
FullDaemonSetResyncPeriod,
framework.ResourceEventHandlerFuncs{
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ds := obj.(*extensions.DaemonSet)
glog.V(4).Infof("Adding daemon set %s", ds.Name)
@@ -173,7 +172,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
@@ -183,7 +182,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
dsc.podStoreSynced = podInformer.HasSynced
// Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change,
dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer(
dsc.nodeStore.Store, dsc.nodeController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Core().Nodes().List(options)
@@ -194,7 +193,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
},
&api.Node{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode,
},
@@ -242,7 +241,7 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
go dsc.podController.Run(stopCh)
go dsc.nodeController.Run(stopCh)
if !framework.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) {
if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) {
return
}