mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	allow lock acquisition injection for quota admission
This commit is contained in:
		@@ -92,6 +92,9 @@ type sharedIndexInformer struct {
 | 
			
		||||
	// blockDeltas gives a way to stop all event distribution so that a late event handler
 | 
			
		||||
	// can safely join the shared informer.
 | 
			
		||||
	blockDeltas sync.Mutex
 | 
			
		||||
	// stopCh is the channel used to stop the main Run process.  We have to track it so that
 | 
			
		||||
	// late joiners can have a proper stop
 | 
			
		||||
	stopCh <-chan struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// dummyController hides the fact that a SharedInformer is different from a dedicated one
 | 
			
		||||
@@ -146,6 +149,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
 | 
			
		||||
		s.started = true
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	s.stopCh = stopCh
 | 
			
		||||
	s.processor.run(stopCh)
 | 
			
		||||
	s.controller.Run(stopCh)
 | 
			
		||||
}
 | 
			
		||||
@@ -220,6 +224,9 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro
 | 
			
		||||
	listener := newProcessListener(handler)
 | 
			
		||||
	s.processor.listeners = append(s.processor.listeners, listener)
 | 
			
		||||
 | 
			
		||||
	go listener.run(s.stopCh)
 | 
			
		||||
	go listener.pop(s.stopCh)
 | 
			
		||||
 | 
			
		||||
	items := s.indexer.List()
 | 
			
		||||
	for i := range items {
 | 
			
		||||
		listener.add(addNotification{newObj: items[i]})
 | 
			
		||||
 
 | 
			
		||||
@@ -59,7 +59,7 @@ func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEv
 | 
			
		||||
	}
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, registry, numEvaluators, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, registry, nil, numEvaluators, stopCh)
 | 
			
		||||
 | 
			
		||||
	return "aAdmission{
 | 
			
		||||
		Handler:   admission.NewHandler(admission.Create, admission.Update),
 | 
			
		||||
 
 | 
			
		||||
@@ -150,7 +150,7 @@ func TestAdmissionIgnoresSubresources(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
@@ -194,7 +194,7 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
@@ -277,7 +277,7 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
		Handler:   admission.NewHandler(admission.Create, admission.Update),
 | 
			
		||||
@@ -367,7 +367,7 @@ func TestAdmitExceedQuotaLimit(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
@@ -411,7 +411,7 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
@@ -465,7 +465,7 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	quotaAccessor.liveLookupCache = liveLookupCache
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
@@ -531,7 +531,7 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
@@ -636,7 +636,7 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
@@ -728,7 +728,7 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
	handler := "aAdmission{
 | 
			
		||||
@@ -846,7 +846,7 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) {
 | 
			
		||||
	quotaAccessor, _ := newQuotaAccessor(kubeClient)
 | 
			
		||||
	quotaAccessor.indexer = indexer
 | 
			
		||||
	go quotaAccessor.Run(stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
 | 
			
		||||
	evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
 | 
			
		||||
	evaluator.(*quotaEvaluator).registry = registry
 | 
			
		||||
 | 
			
		||||
	defer utilruntime.HandleCrash()
 | 
			
		||||
 
 | 
			
		||||
@@ -44,6 +44,8 @@ type Evaluator interface {
 | 
			
		||||
 | 
			
		||||
type quotaEvaluator struct {
 | 
			
		||||
	quotaAccessor QuotaAccessor
 | 
			
		||||
	// lockAquisitionFunc acquires any required locks and returns a cleanup method to defer
 | 
			
		||||
	lockAquisitionFunc func([]api.ResourceQuota) func()
 | 
			
		||||
 | 
			
		||||
	// registry that knows how to measure usage for objects
 | 
			
		||||
	registry quota.Registry
 | 
			
		||||
@@ -96,9 +98,10 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter {
 | 
			
		||||
// NewQuotaEvaluator configures an admission controller that can enforce quota constraints
 | 
			
		||||
// using the provided registry.  The registry must have the capability to handle group/kinds that
 | 
			
		||||
// are persisted by the server this admission controller is intercepting
 | 
			
		||||
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, workers int, stopCh <-chan struct{}) Evaluator {
 | 
			
		||||
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAquisitionFunc func([]api.ResourceQuota) func(), workers int, stopCh <-chan struct{}) Evaluator {
 | 
			
		||||
	return "aEvaluator{
 | 
			
		||||
		quotaAccessor: quotaAccessor,
 | 
			
		||||
		quotaAccessor:      quotaAccessor,
 | 
			
		||||
		lockAquisitionFunc: lockAquisitionFunc,
 | 
			
		||||
 | 
			
		||||
		registry: registry,
 | 
			
		||||
 | 
			
		||||
@@ -169,6 +172,11 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if e.lockAquisitionFunc != nil {
 | 
			
		||||
		releaseLocks := e.lockAquisitionFunc(quotas)
 | 
			
		||||
		defer releaseLocks()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	e.checkQuotas(quotas, admissionAttributes, 3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user