mirror of
https://github.com/optim-enterprises-bv/kubernetes.git
synced 2025-10-30 17:58:14 +00:00
Merge pull request #41261 from ncdc/shared-informers-07-resourcequota
Automatic merge from submit-queue Switch resourcequota controller to shared informers Originally part of #40097 I have had some issues with this change in the past, when I updated `pkg/quota` to use the new informers while `pkg/controller/resourcequota` remained on the old informers. In this PR, both are switched to using the new informers. The issues in the past were lots of flakey test failures in the ResourceQuota e2es, where it would randomly fail to see deletions and handle replenishment. I am hoping that now that everything here is consistently using the new informers, there won't be any more of these flakes, but it's something to keep an eye out for. I also think `pkg/controller/resourcequota` could be cleaned up. I don't think there's really any need for `replenishment_controller.go` any more since it's no longer running individual controllers per kind to replenish. It instead just uses the shared informer and adds event handlers to it. But maybe we do that in a follow up. cc @derekwaynecarr @smarterclayton @wojtek-t @deads2k @sttts @liggitt @timothysc @kubernetes/sig-scalability-pr-reviews
This commit is contained in:
@@ -76,7 +76,7 @@ func startPodGCController(ctx ControllerContext) (bool, error) {
|
|||||||
|
|
||||||
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
|
func startResourceQuotaController(ctx ControllerContext) (bool, error) {
|
||||||
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
|
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
|
||||||
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
|
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.NewInformerFactory)
|
||||||
groupKindsToReplenish := []schema.GroupKind{
|
groupKindsToReplenish := []schema.GroupKind{
|
||||||
api.Kind("Pod"),
|
api.Kind("Pod"),
|
||||||
api.Kind("Service"),
|
api.Kind("Service"),
|
||||||
@@ -87,9 +87,10 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) {
|
|||||||
}
|
}
|
||||||
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
||||||
KubeClient: resourceQuotaControllerClient,
|
KubeClient: resourceQuotaControllerClient,
|
||||||
|
ResourceQuotaInformer: ctx.NewInformerFactory.Core().V1().ResourceQuotas(),
|
||||||
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
|
ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.Options.ResourceQuotaSyncPeriod.Duration),
|
||||||
Registry: resourceQuotaRegistry,
|
Registry: resourceQuotaRegistry,
|
||||||
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.InformerFactory, resourceQuotaControllerClient),
|
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(ctx.NewInformerFactory),
|
||||||
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
|
ReplenishmentResyncPeriod: ResyncPeriod(&ctx.Options),
|
||||||
GroupKindsToReplenish: groupKindsToReplenish,
|
GroupKindsToReplenish: groupKindsToReplenish,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//pkg/apis/componentconfig:go_default_library",
|
"//pkg/apis/componentconfig:go_default_library",
|
||||||
|
"//pkg/apis/componentconfig/install:go_default_library",
|
||||||
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
|
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
|
||||||
"//pkg/util/taints:go_default_library",
|
"//pkg/util/taints:go_default_library",
|
||||||
"//vendor:github.com/spf13/pflag",
|
"//vendor:github.com/spf13/pflag",
|
||||||
|
|||||||
@@ -26,6 +26,8 @@ import (
|
|||||||
utilflag "k8s.io/apiserver/pkg/util/flag"
|
utilflag "k8s.io/apiserver/pkg/util/flag"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||||
|
// Need to make sure the componentconfig api is installed so defaulting funcs work
|
||||||
|
_ "k8s.io/kubernetes/pkg/apis/componentconfig/install"
|
||||||
"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||||
utiltaints "k8s.io/kubernetes/pkg/util/taints"
|
utiltaints "k8s.io/kubernetes/pkg/util/taints"
|
||||||
|
|
||||||
|
|||||||
@@ -20,20 +20,23 @@ go_library(
|
|||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//pkg/api/v1:go_default_library",
|
"//pkg/api/v1:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
|
||||||
|
"//pkg/client/listers/core/v1:go_default_library",
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/controller/informers:go_default_library",
|
|
||||||
"//pkg/quota:go_default_library",
|
"//pkg/quota:go_default_library",
|
||||||
"//pkg/quota/evaluator/core:go_default_library",
|
"//pkg/quota/evaluator/core:go_default_library",
|
||||||
"//pkg/util/metrics:go_default_library",
|
"//pkg/util/metrics:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/equality",
|
"//vendor:k8s.io/apimachinery/pkg/api/equality",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/meta",
|
"//vendor:k8s.io/apimachinery/pkg/api/meta",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/labels",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
|
||||||
"//vendor:k8s.io/client-go/tools/cache",
|
"//vendor:k8s.io/client-go/tools/cache",
|
||||||
"//vendor:k8s.io/client-go/util/workqueue",
|
"//vendor:k8s.io/client-go/util/workqueue",
|
||||||
],
|
],
|
||||||
@@ -51,6 +54,7 @@ go_test(
|
|||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//pkg/api/v1:go_default_library",
|
"//pkg/api/v1:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||||
"//pkg/controller:go_default_library",
|
"//pkg/controller:go_default_library",
|
||||||
"//pkg/quota/generic:go_default_library",
|
"//pkg/quota/generic:go_default_library",
|
||||||
"//pkg/quota/install:go_default_library",
|
"//pkg/quota/install:go_default_library",
|
||||||
|
|||||||
@@ -22,19 +22,15 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/informers"
|
|
||||||
"k8s.io/kubernetes/pkg/quota/evaluator/core"
|
"k8s.io/kubernetes/pkg/quota/evaluator/core"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReplenishmentFunc is a function that is invoked when controller sees a change
|
// ReplenishmentFunc is a function that is invoked when controller sees a change
|
||||||
@@ -96,151 +92,96 @@ type ReplenishmentControllerFactory interface {
|
|||||||
|
|
||||||
// replenishmentControllerFactory implements ReplenishmentControllerFactory
|
// replenishmentControllerFactory implements ReplenishmentControllerFactory
|
||||||
type replenishmentControllerFactory struct {
|
type replenishmentControllerFactory struct {
|
||||||
kubeClient clientset.Interface
|
|
||||||
sharedInformerFactory informers.SharedInformerFactory
|
sharedInformerFactory informers.SharedInformerFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers
|
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers
|
||||||
// to replenish resources when updated or deleted
|
// to replenish resources when updated or deleted
|
||||||
func NewReplenishmentControllerFactory(f informers.SharedInformerFactory, kubeClient clientset.Interface) ReplenishmentControllerFactory {
|
func NewReplenishmentControllerFactory(f informers.SharedInformerFactory) ReplenishmentControllerFactory {
|
||||||
return &replenishmentControllerFactory{
|
return &replenishmentControllerFactory{
|
||||||
kubeClient: kubeClient,
|
|
||||||
sharedInformerFactory: f,
|
sharedInformerFactory: f,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReplenishmentControllerFactoryFromClient returns a factory that knows how to build controllers to replenish resources
|
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) {
|
||||||
// when updated or deleted using the specified client.
|
var (
|
||||||
func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory {
|
informer informers.GenericInformer
|
||||||
return NewReplenishmentControllerFactory(nil, kubeClient)
|
err error
|
||||||
}
|
)
|
||||||
|
|
||||||
// controllerFor returns a replenishment controller for the specified group resource.
|
|
||||||
func controllerFor(
|
|
||||||
groupResource schema.GroupResource,
|
|
||||||
f informers.SharedInformerFactory,
|
|
||||||
handlerFuncs cache.ResourceEventHandlerFuncs,
|
|
||||||
) (cache.Controller, error) {
|
|
||||||
genericInformer, err := f.ForResource(groupResource)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
informer := genericInformer.Informer()
|
|
||||||
informer.AddEventHandler(handlerFuncs)
|
|
||||||
return informer.GetController(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.Controller, err error) {
|
|
||||||
if r.kubeClient != nil && r.kubeClient.Core().RESTClient().GetRateLimiter() != nil {
|
|
||||||
metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().RESTClient().GetRateLimiter())
|
|
||||||
}
|
|
||||||
|
|
||||||
switch options.GroupKind {
|
switch options.GroupKind {
|
||||||
case api.Kind("Pod"):
|
case api.Kind("Pod"):
|
||||||
if r.sharedInformerFactory != nil {
|
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("pods"))
|
||||||
result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
|
cache.ResourceEventHandlerFuncs{
|
||||||
UpdateFunc: PodReplenishmentUpdateFunc(options),
|
UpdateFunc: PodReplenishmentUpdateFunc(options),
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||||
})
|
|
||||||
break
|
|
||||||
}
|
|
||||||
result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
|
|
||||||
case api.Kind("Service"):
|
|
||||||
// TODO move to informer when defined
|
|
||||||
_, result = cache.NewInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
|
||||||
return r.kubeClient.Core().Services(metav1.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return r.kubeClient.Core().Services(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
&v1.Service{},
|
|
||||||
options.ResyncPeriod(),
|
options.ResyncPeriod(),
|
||||||
|
)
|
||||||
|
case api.Kind("Service"):
|
||||||
|
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("services"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
UpdateFunc: ServiceReplenishmentUpdateFunc(options),
|
UpdateFunc: ServiceReplenishmentUpdateFunc(options),
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||||
},
|
},
|
||||||
|
options.ResyncPeriod(),
|
||||||
)
|
)
|
||||||
case api.Kind("ReplicationController"):
|
case api.Kind("ReplicationController"):
|
||||||
// TODO move to informer when defined
|
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("replicationcontrollers"))
|
||||||
_, result = cache.NewInformer(
|
if err != nil {
|
||||||
&cache.ListWatch{
|
return nil, err
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
}
|
||||||
return r.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).List(options)
|
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return r.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.ReplicationController{},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||||
},
|
},
|
||||||
|
options.ResyncPeriod(),
|
||||||
)
|
)
|
||||||
case api.Kind("PersistentVolumeClaim"):
|
case api.Kind("PersistentVolumeClaim"):
|
||||||
if r.sharedInformerFactory != nil {
|
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"))
|
||||||
result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
|
if err != nil {
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
return nil, err
|
||||||
})
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
// TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths...
|
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
_, result = cache.NewInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
|
||||||
return r.kubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return r.kubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.PersistentVolumeClaim{},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||||
},
|
},
|
||||||
|
options.ResyncPeriod(),
|
||||||
)
|
)
|
||||||
case api.Kind("Secret"):
|
case api.Kind("Secret"):
|
||||||
// TODO move to informer when defined
|
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("secrets"))
|
||||||
_, result = cache.NewInformer(
|
if err != nil {
|
||||||
&cache.ListWatch{
|
return nil, err
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
}
|
||||||
return r.kubeClient.Core().Secrets(metav1.NamespaceAll).List(options)
|
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return r.kubeClient.Core().Secrets(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Secret{},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||||
},
|
},
|
||||||
|
options.ResyncPeriod(),
|
||||||
)
|
)
|
||||||
case api.Kind("ConfigMap"):
|
case api.Kind("ConfigMap"):
|
||||||
// TODO move to informer when defined
|
informer, err = r.sharedInformerFactory.ForResource(v1.SchemeGroupVersion.WithResource("configmaps"))
|
||||||
_, result = cache.NewInformer(
|
if err != nil {
|
||||||
&cache.ListWatch{
|
return nil, err
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
}
|
||||||
return r.kubeClient.Core().ConfigMaps(metav1.NamespaceAll).List(options)
|
informer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return r.kubeClient.Core().ConfigMaps(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.ConfigMap{},
|
|
||||||
options.ResyncPeriod(),
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
|
||||||
},
|
},
|
||||||
|
options.ResyncPeriod(),
|
||||||
)
|
)
|
||||||
default:
|
default:
|
||||||
return nil, NewUnhandledGroupKindError(options.GroupKind)
|
return nil, NewUnhandledGroupKindError(options.GroupKind)
|
||||||
}
|
}
|
||||||
return result, err
|
return informer.Informer().GetController(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceReplenishmentUpdateFunc will replenish if the service was quota tracked has changed service type
|
// ServiceReplenishmentUpdateFunc will replenish if the service was quota tracked has changed service type
|
||||||
|
|||||||
@@ -17,22 +17,26 @@ limitations under the License.
|
|||||||
package resourcequota
|
package resourcequota
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
|
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
|
||||||
|
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/quota"
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
@@ -42,6 +46,8 @@ import (
|
|||||||
type ResourceQuotaControllerOptions struct {
|
type ResourceQuotaControllerOptions struct {
|
||||||
// Must have authority to list all quotas, and update quota status
|
// Must have authority to list all quotas, and update quota status
|
||||||
KubeClient clientset.Interface
|
KubeClient clientset.Interface
|
||||||
|
// Shared informer for resource quotas
|
||||||
|
ResourceQuotaInformer coreinformers.ResourceQuotaInformer
|
||||||
// Controls full recalculation of quota usage
|
// Controls full recalculation of quota usage
|
||||||
ResyncPeriod controller.ResyncPeriodFunc
|
ResyncPeriod controller.ResyncPeriodFunc
|
||||||
// Knows how to calculate usage
|
// Knows how to calculate usage
|
||||||
@@ -59,10 +65,10 @@ type ResourceQuotaControllerOptions struct {
|
|||||||
type ResourceQuotaController struct {
|
type ResourceQuotaController struct {
|
||||||
// Must have authority to list all resources in the system, and update quota status
|
// Must have authority to list all resources in the system, and update quota status
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
// An index of resource quota objects by namespace
|
// A lister/getter of resource quota objects
|
||||||
rqIndexer cache.Indexer
|
rqLister corelisters.ResourceQuotaLister
|
||||||
// Watches changes to all resource quota
|
// A list of functions that return true when their caches have synced
|
||||||
rqController cache.Controller
|
informerSyncedFuncs []cache.InformerSynced
|
||||||
// ResourceQuota objects that need to be synchronized
|
// ResourceQuota objects that need to be synchronized
|
||||||
queue workqueue.RateLimitingInterface
|
queue workqueue.RateLimitingInterface
|
||||||
// missingUsageQueue holds objects that are missing the initial usage information
|
// missingUsageQueue holds objects that are missing the initial usage information
|
||||||
@@ -81,6 +87,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||||||
// build the resource quota controller
|
// build the resource quota controller
|
||||||
rq := &ResourceQuotaController{
|
rq := &ResourceQuotaController{
|
||||||
kubeClient: options.KubeClient,
|
kubeClient: options.KubeClient,
|
||||||
|
rqLister: options.ResourceQuotaInformer.Lister(),
|
||||||
|
informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced},
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
|
||||||
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
|
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
|
||||||
resyncPeriod: options.ResyncPeriod,
|
resyncPeriod: options.ResyncPeriod,
|
||||||
@@ -93,18 +101,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||||||
// set the synchronization handler
|
// set the synchronization handler
|
||||||
rq.syncHandler = rq.syncResourceQuotaFromKey
|
rq.syncHandler = rq.syncResourceQuotaFromKey
|
||||||
|
|
||||||
// build the controller that observes quota
|
options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(
|
|
||||||
&cache.ListWatch{
|
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
|
||||||
return rq.kubeClient.Core().ResourceQuotas(metav1.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return rq.kubeClient.Core().ResourceQuotas(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.ResourceQuota{},
|
|
||||||
rq.resyncPeriod(),
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: rq.addQuota,
|
AddFunc: rq.addQuota,
|
||||||
UpdateFunc: func(old, cur interface{}) {
|
UpdateFunc: func(old, cur interface{}) {
|
||||||
@@ -128,7 +125,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||||||
// way of achieving this is by performing a `stop` operation on the controller.
|
// way of achieving this is by performing a `stop` operation on the controller.
|
||||||
DeleteFunc: rq.enqueueResourceQuota,
|
DeleteFunc: rq.enqueueResourceQuota,
|
||||||
},
|
},
|
||||||
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
rq.resyncPeriod(),
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, groupKindToReplenish := range options.GroupKindsToReplenish {
|
for _, groupKindToReplenish := range options.GroupKindsToReplenish {
|
||||||
@@ -141,7 +138,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
|
glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
|
||||||
} else {
|
} else {
|
||||||
rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController)
|
// make sure we wait for each shared informer's cache to sync
|
||||||
|
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, replenishmentController.HasSynced)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rq
|
return rq
|
||||||
@@ -150,8 +148,18 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
|
|||||||
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
|
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
|
||||||
func (rq *ResourceQuotaController) enqueueAll() {
|
func (rq *ResourceQuotaController) enqueueAll() {
|
||||||
defer glog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
|
defer glog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
|
||||||
for _, k := range rq.rqIndexer.ListKeys() {
|
rqs, err := rq.rqLister.List(labels.Everything())
|
||||||
rq.queue.Add(k)
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i := range rqs {
|
||||||
|
key, err := controller.KeyFunc(rqs[i])
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", rqs[i], err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rq.queue.Add(key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,18 +236,24 @@ func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface)
|
|||||||
// Run begins quota controller using the specified number of workers
|
// Run begins quota controller using the specified number of workers
|
||||||
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
|
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
go rq.rqController.Run(stopCh)
|
|
||||||
|
glog.Infof("Starting resource quota controller")
|
||||||
|
|
||||||
// the controllers that replenish other resources to respond rapidly to state changes
|
// the controllers that replenish other resources to respond rapidly to state changes
|
||||||
for _, replenishmentController := range rq.replenishmentControllers {
|
for _, replenishmentController := range rq.replenishmentControllers {
|
||||||
go replenishmentController.Run(stopCh)
|
go replenishmentController.Run(stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !cache.WaitForCacheSync(stopCh, rq.informerSyncedFuncs...) {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// the workers that chug through the quota calculation backlog
|
// the workers that chug through the quota calculation backlog
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
|
go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
|
||||||
go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
|
go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
|
||||||
}
|
}
|
||||||
// the timer for how often we do a full recalculation across all quotas
|
|
||||||
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
glog.Infof("Shutting down ResourceQuotaController")
|
glog.Infof("Shutting down ResourceQuotaController")
|
||||||
rq.queue.ShutDown()
|
rq.queue.ShutDown()
|
||||||
@@ -252,8 +266,12 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err
|
|||||||
glog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Now().Sub(startTime))
|
glog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Now().Sub(startTime))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
obj, exists, err := rq.rqIndexer.GetByKey(key)
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
if !exists {
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
quota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
glog.Infof("Resource quota has been deleted %v", key)
|
glog.Infof("Resource quota has been deleted %v", key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -262,17 +280,16 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err
|
|||||||
rq.queue.Add(key)
|
rq.queue.Add(key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
quota := *obj.(*v1.ResourceQuota)
|
|
||||||
return rq.syncResourceQuota(quota)
|
return rq.syncResourceQuota(quota)
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
|
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
|
||||||
func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota v1.ResourceQuota) (err error) {
|
func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota *v1.ResourceQuota) (err error) {
|
||||||
// quota is dirty if any part of spec hard limits differs from the status hard limits
|
// quota is dirty if any part of spec hard limits differs from the status hard limits
|
||||||
dirty := !apiequality.Semantic.DeepEqual(v1ResourceQuota.Spec.Hard, v1ResourceQuota.Status.Hard)
|
dirty := !apiequality.Semantic.DeepEqual(v1ResourceQuota.Spec.Hard, v1ResourceQuota.Status.Hard)
|
||||||
|
|
||||||
resourceQuota := api.ResourceQuota{}
|
resourceQuota := api.ResourceQuota{}
|
||||||
if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(&v1ResourceQuota, &resourceQuota, nil); err != nil {
|
if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(v1ResourceQuota, &resourceQuota, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -338,11 +355,14 @@ func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, na
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check if this namespace even has a quota...
|
// check if this namespace even has a quota...
|
||||||
indexKey := &v1.ResourceQuota{}
|
resourceQuotas, err := rq.rqLister.ResourceQuotas(namespace).List(labels.Everything())
|
||||||
indexKey.Namespace = namespace
|
if errors.IsNotFound(err) {
|
||||||
resourceQuotas, err := rq.rqIndexer.Index("namespace", indexKey)
|
utilruntime.HandleError(fmt.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod()))
|
||||||
|
return
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod())
|
utilruntime.HandleError(fmt.Errorf("error checking to see if namespace %s has any ResourceQuota associated with it: %v", namespace, err))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if len(resourceQuotas) == 0 {
|
if len(resourceQuotas) == 0 {
|
||||||
return
|
return
|
||||||
@@ -350,7 +370,7 @@ func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, na
|
|||||||
|
|
||||||
// only queue those quotas that are tracking a resource associated with this kind.
|
// only queue those quotas that are tracking a resource associated with this kind.
|
||||||
for i := range resourceQuotas {
|
for i := range resourceQuotas {
|
||||||
resourceQuota := resourceQuotas[i].(*v1.ResourceQuota)
|
resourceQuota := resourceQuotas[i]
|
||||||
internalResourceQuota := &api.ResourceQuota{}
|
internalResourceQuota := &api.ResourceQuota{}
|
||||||
if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil {
|
if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil {
|
||||||
glog.Error(err)
|
glog.Error(err)
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/quota/generic"
|
"k8s.io/kubernetes/pkg/quota/generic"
|
||||||
"k8s.io/kubernetes/pkg/quota/install"
|
"k8s.io/kubernetes/pkg/quota/install"
|
||||||
@@ -106,21 +107,23 @@ func TestSyncResourceQuota(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(&podList, &resourceQuota)
|
kubeClient := fake.NewSimpleClientset(&podList, &resourceQuota)
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
|
Registry: install.NewRegistry(kubeClient, nil),
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
GroupKindsToReplenish: []schema.GroupKind{
|
||||||
api.Kind("Pod"),
|
api.Kind("Pod"),
|
||||||
api.Kind("Service"),
|
api.Kind("Service"),
|
||||||
api.Kind("ReplicationController"),
|
api.Kind("ReplicationController"),
|
||||||
api.Kind("PersistentVolumeClaim"),
|
api.Kind("PersistentVolumeClaim"),
|
||||||
},
|
},
|
||||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
}
|
}
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||||
err := quotaController.syncResourceQuota(resourceQuota)
|
err := quotaController.syncResourceQuota(&resourceQuota)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %v", err)
|
t.Fatalf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@@ -191,21 +194,23 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(&resourceQuota)
|
kubeClient := fake.NewSimpleClientset(&resourceQuota)
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
|
Registry: install.NewRegistry(kubeClient, nil),
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
GroupKindsToReplenish: []schema.GroupKind{
|
||||||
api.Kind("Pod"),
|
api.Kind("Pod"),
|
||||||
api.Kind("Service"),
|
api.Kind("Service"),
|
||||||
api.Kind("ReplicationController"),
|
api.Kind("ReplicationController"),
|
||||||
api.Kind("PersistentVolumeClaim"),
|
api.Kind("PersistentVolumeClaim"),
|
||||||
},
|
},
|
||||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
}
|
}
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||||
err := quotaController.syncResourceQuota(resourceQuota)
|
err := quotaController.syncResourceQuota(&resourceQuota)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %v", err)
|
t.Fatalf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@@ -279,21 +284,23 @@ func TestSyncResourceQuotaSpecHardChange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(&resourceQuota)
|
kubeClient := fake.NewSimpleClientset(&resourceQuota)
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
|
Registry: install.NewRegistry(kubeClient, nil),
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
GroupKindsToReplenish: []schema.GroupKind{
|
||||||
api.Kind("Pod"),
|
api.Kind("Pod"),
|
||||||
api.Kind("Service"),
|
api.Kind("Service"),
|
||||||
api.Kind("ReplicationController"),
|
api.Kind("ReplicationController"),
|
||||||
api.Kind("PersistentVolumeClaim"),
|
api.Kind("PersistentVolumeClaim"),
|
||||||
},
|
},
|
||||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
}
|
}
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||||
err := quotaController.syncResourceQuota(resourceQuota)
|
err := quotaController.syncResourceQuota(&resourceQuota)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %v", err)
|
t.Fatalf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@@ -367,21 +374,23 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(&v1.PodList{}, &resourceQuota)
|
kubeClient := fake.NewSimpleClientset(&v1.PodList{}, &resourceQuota)
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
|
Registry: install.NewRegistry(kubeClient, nil),
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
GroupKindsToReplenish: []schema.GroupKind{
|
||||||
api.Kind("Pod"),
|
api.Kind("Pod"),
|
||||||
api.Kind("Service"),
|
api.Kind("Service"),
|
||||||
api.Kind("ReplicationController"),
|
api.Kind("ReplicationController"),
|
||||||
api.Kind("PersistentVolumeClaim"),
|
api.Kind("PersistentVolumeClaim"),
|
||||||
},
|
},
|
||||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
}
|
}
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||||
err := quotaController.syncResourceQuota(resourceQuota)
|
err := quotaController.syncResourceQuota(&resourceQuota)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %v", err)
|
t.Fatalf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@@ -399,16 +408,18 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
|
|||||||
|
|
||||||
func TestAddQuota(t *testing.T) {
|
func TestAddQuota(t *testing.T) {
|
||||||
kubeClient := fake.NewSimpleClientset()
|
kubeClient := fake.NewSimpleClientset()
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
|
||||||
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
|
||||||
KubeClient: kubeClient,
|
KubeClient: kubeClient,
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
ResourceQuotaInformer: informerFactory.Core().V1().ResourceQuotas(),
|
||||||
Registry: install.NewRegistry(kubeClient, nil),
|
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
|
Registry: install.NewRegistry(kubeClient, nil),
|
||||||
GroupKindsToReplenish: []schema.GroupKind{
|
GroupKindsToReplenish: []schema.GroupKind{
|
||||||
api.Kind("Pod"),
|
api.Kind("Pod"),
|
||||||
api.Kind("ReplicationController"),
|
api.Kind("ReplicationController"),
|
||||||
api.Kind("PersistentVolumeClaim"),
|
api.Kind("PersistentVolumeClaim"),
|
||||||
},
|
},
|
||||||
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
|
ControllerFactory: NewReplenishmentControllerFactory(informerFactory),
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
}
|
}
|
||||||
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ go_library(
|
|||||||
"//pkg/api/validation:go_default_library",
|
"//pkg/api/validation:go_default_library",
|
||||||
"//pkg/apis/storage/util:go_default_library",
|
"//pkg/apis/storage/util:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||||
"//pkg/controller/informers:go_default_library",
|
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||||
"//pkg/kubelet/qos:go_default_library",
|
"//pkg/kubelet/qos:go_default_library",
|
||||||
"//pkg/quota:go_default_library",
|
"//pkg/quota:go_default_library",
|
||||||
"//pkg/quota/generic:go_default_library",
|
"//pkg/quota/generic:go_default_library",
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/apis/storage/util"
|
"k8s.io/kubernetes/pkg/apis/storage/util"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/pkg/controller/informers"
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
"k8s.io/kubernetes/pkg/quota"
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
"k8s.io/kubernetes/pkg/quota/generic"
|
"k8s.io/kubernetes/pkg/quota/generic"
|
||||||
)
|
)
|
||||||
@@ -83,7 +83,7 @@ func listPersistentVolumeClaimsByNamespaceFuncUsingClient(kubeClient clientset.I
|
|||||||
func NewPersistentVolumeClaimEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
|
func NewPersistentVolumeClaimEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
|
||||||
listFuncByNamespace := listPersistentVolumeClaimsByNamespaceFuncUsingClient(kubeClient)
|
listFuncByNamespace := listPersistentVolumeClaimsByNamespaceFuncUsingClient(kubeClient)
|
||||||
if f != nil {
|
if f != nil {
|
||||||
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, schema.GroupResource{Resource: "persistentvolumeclaims"})
|
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("persistentvolumeclaims"))
|
||||||
}
|
}
|
||||||
return &pvcEvaluator{
|
return &pvcEvaluator{
|
||||||
listFuncByNamespace: listFuncByNamespace,
|
listFuncByNamespace: listFuncByNamespace,
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/api/validation"
|
"k8s.io/kubernetes/pkg/api/validation"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/pkg/controller/informers"
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||||
"k8s.io/kubernetes/pkg/quota"
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
"k8s.io/kubernetes/pkg/quota/generic"
|
"k8s.io/kubernetes/pkg/quota/generic"
|
||||||
@@ -71,7 +71,7 @@ func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic.
|
|||||||
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
|
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
|
||||||
listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient)
|
listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient)
|
||||||
if f != nil {
|
if f != nil {
|
||||||
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, schema.GroupResource{Resource: "pods"})
|
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("pods"))
|
||||||
}
|
}
|
||||||
return &podEvaluator{
|
return &podEvaluator{
|
||||||
listFuncByNamespace: listFuncByNamespace,
|
listFuncByNamespace: listFuncByNamespace,
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ package core
|
|||||||
import (
|
import (
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/pkg/controller/informers"
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
"k8s.io/kubernetes/pkg/quota"
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
"k8s.io/kubernetes/pkg/quota/generic"
|
"k8s.io/kubernetes/pkg/quota/generic"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ go_library(
|
|||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//pkg/controller/informers:go_default_library",
|
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||||
"//pkg/quota:go_default_library",
|
"//pkg/quota:go_default_library",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/resource",
|
"//vendor:k8s.io/apimachinery/pkg/api/resource",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
|
|||||||
@@ -26,18 +26,18 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apiserver/pkg/admission"
|
"k8s.io/apiserver/pkg/admission"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/controller/informers"
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
"k8s.io/kubernetes/pkg/quota"
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ListResourceUsingInformerFunc returns a listing function based on the shared informer factory for the specified resource.
|
// ListResourceUsingInformerFunc returns a listing function based on the shared informer factory for the specified resource.
|
||||||
func ListResourceUsingInformerFunc(f informers.SharedInformerFactory, groupResource schema.GroupResource) ListFuncByNamespace {
|
func ListResourceUsingInformerFunc(f informers.SharedInformerFactory, resource schema.GroupVersionResource) ListFuncByNamespace {
|
||||||
return func(namespace string, options metav1.ListOptions) ([]runtime.Object, error) {
|
return func(namespace string, options metav1.ListOptions) ([]runtime.Object, error) {
|
||||||
labelSelector, err := labels.Parse(options.LabelSelector)
|
labelSelector, err := labels.Parse(options.LabelSelector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
informer, err := f.ForResource(groupResource)
|
informer, err := f.ForResource(resource)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ go_library(
|
|||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||||
"//pkg/controller/informers:go_default_library",
|
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
|
||||||
"//pkg/quota:go_default_library",
|
"//pkg/quota:go_default_library",
|
||||||
"//pkg/quota/evaluator/core:go_default_library",
|
"//pkg/quota/evaluator/core:go_default_library",
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ package install
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/pkg/controller/informers"
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
"k8s.io/kubernetes/pkg/quota"
|
"k8s.io/kubernetes/pkg/quota"
|
||||||
"k8s.io/kubernetes/pkg/quota/evaluator/core"
|
"k8s.io/kubernetes/pkg/quota/evaluator/core"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -296,15 +296,17 @@ func ClusterRoles() []rbac.ClusterRole {
|
|||||||
rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(),
|
rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(),
|
||||||
|
|
||||||
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources(
|
rbac.NewRule("list", "watch").Groups(legacyGroup).Resources(
|
||||||
|
"configmaps",
|
||||||
"namespaces",
|
"namespaces",
|
||||||
"nodes",
|
"nodes",
|
||||||
"persistentvolumeclaims",
|
"persistentvolumeclaims",
|
||||||
"persistentvolumes",
|
"persistentvolumes",
|
||||||
"pods",
|
"pods",
|
||||||
|
"replicationcontrollers",
|
||||||
|
"resourcequotas",
|
||||||
"secrets",
|
"secrets",
|
||||||
"services",
|
"services",
|
||||||
"serviceaccounts",
|
"serviceaccounts",
|
||||||
"replicationcontrollers",
|
|
||||||
).RuleOrDie(),
|
).RuleOrDie(),
|
||||||
rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(),
|
rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(),
|
||||||
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
|
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
|
||||||
|
|||||||
@@ -464,12 +464,14 @@ items:
|
|||||||
- apiGroups:
|
- apiGroups:
|
||||||
- ""
|
- ""
|
||||||
resources:
|
resources:
|
||||||
|
- configmaps
|
||||||
- namespaces
|
- namespaces
|
||||||
- nodes
|
- nodes
|
||||||
- persistentvolumeclaims
|
- persistentvolumeclaims
|
||||||
- persistentvolumes
|
- persistentvolumes
|
||||||
- pods
|
- pods
|
||||||
- replicationcontrollers
|
- replicationcontrollers
|
||||||
|
- resourcequotas
|
||||||
- secrets
|
- secrets
|
||||||
- serviceaccounts
|
- serviceaccounts
|
||||||
- services
|
- services
|
||||||
|
|||||||
@@ -94,7 +94,6 @@ func TestQuota(t *testing.T) {
|
|||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
rm.SetEventRecorder(&record.FakeRecorder{})
|
rm.SetEventRecorder(&record.FakeRecorder{})
|
||||||
informers.Start(controllerCh)
|
|
||||||
go rm.Run(3, controllerCh)
|
go rm.Run(3, controllerCh)
|
||||||
|
|
||||||
resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil)
|
resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil)
|
||||||
@@ -103,13 +102,15 @@ func TestQuota(t *testing.T) {
|
|||||||
}
|
}
|
||||||
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
||||||
KubeClient: clientset,
|
KubeClient: clientset,
|
||||||
|
ResourceQuotaInformer: informers.Core().V1().ResourceQuotas(),
|
||||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
Registry: resourceQuotaRegistry,
|
Registry: resourceQuotaRegistry,
|
||||||
GroupKindsToReplenish: groupKindsToReplenish,
|
GroupKindsToReplenish: groupKindsToReplenish,
|
||||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||||
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(clientset),
|
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(informers),
|
||||||
}
|
}
|
||||||
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, controllerCh)
|
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, controllerCh)
|
||||||
|
informers.Start(controllerCh)
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
scale(t, ns2.Name, clientset)
|
scale(t, ns2.Name, clientset)
|
||||||
|
|||||||
Reference in New Issue
Block a user