mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	add built-in ratelimiter to workqueue
This commit is contained in:
		
							
								
								
									
										204
									
								
								pkg/util/workqueue/default_rate_limiters.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										204
									
								
								pkg/util/workqueue/default_rate_limiters.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,204 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package workqueue
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"math"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/juju/ratelimit"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type RateLimiter interface {
 | 
			
		||||
	// When gets an item and gets to decide how long that item should wait
 | 
			
		||||
	When(item interface{}) time.Duration
 | 
			
		||||
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
 | 
			
		||||
	// or for success, we'll stop tracking it
 | 
			
		||||
	Forget(item interface{})
 | 
			
		||||
	// NumRequeues returns back how many failures the item has had
 | 
			
		||||
	NumRequeues(item interface{}) int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
 | 
			
		||||
// both overall and per-item rate limitting.  The overall is a token bucket and the per-item is exponential
 | 
			
		||||
func DefaultControllerRateLimiter() RateLimiter {
 | 
			
		||||
	return NewMaxOfRateLimiter(
 | 
			
		||||
		DefaultItemBasedRateLimiter(),
 | 
			
		||||
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
 | 
			
		||||
		&BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
 | 
			
		||||
type BucketRateLimiter struct {
 | 
			
		||||
	*ratelimit.Bucket
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ RateLimiter = &BucketRateLimiter{}
 | 
			
		||||
 | 
			
		||||
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
 | 
			
		||||
	return r.Bucket.Take(1)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
 | 
			
		||||
	return 0
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketRateLimiter) Forget(item interface{}) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit
 | 
			
		||||
// dealing with max failures and expiration are up to the caller
 | 
			
		||||
type ItemExponentialFailureRateLimiter struct {
 | 
			
		||||
	failuresLock sync.Mutex
 | 
			
		||||
	failures     map[interface{}]int
 | 
			
		||||
 | 
			
		||||
	baseDelay time.Duration
 | 
			
		||||
	maxDelay  time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
 | 
			
		||||
 | 
			
		||||
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
 | 
			
		||||
	return &ItemExponentialFailureRateLimiter{
 | 
			
		||||
		failures:  map[interface{}]int{},
 | 
			
		||||
		baseDelay: baseDelay,
 | 
			
		||||
		maxDelay:  maxDelay,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DefaultItemBasedRateLimiter() RateLimiter {
 | 
			
		||||
	return NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1000*time.Second)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
 | 
			
		||||
	r.failuresLock.Lock()
 | 
			
		||||
	defer r.failuresLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	r.failures[item] = r.failures[item] + 1
 | 
			
		||||
 | 
			
		||||
	calculated := r.baseDelay * time.Duration(math.Pow10(r.failures[item]-1))
 | 
			
		||||
	if calculated > r.maxDelay {
 | 
			
		||||
		return r.maxDelay
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return calculated
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
 | 
			
		||||
	r.failuresLock.Lock()
 | 
			
		||||
	defer r.failuresLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	return r.failures[item]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
 | 
			
		||||
	r.failuresLock.Lock()
 | 
			
		||||
	defer r.failuresLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	delete(r.failures, item)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
 | 
			
		||||
type ItemFastSlowRateLimiter struct {
 | 
			
		||||
	failuresLock sync.Mutex
 | 
			
		||||
	failures     map[interface{}]int
 | 
			
		||||
 | 
			
		||||
	maxFastAttempts int
 | 
			
		||||
	fastDelay       time.Duration
 | 
			
		||||
	slowDelay       time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ RateLimiter = &ItemFastSlowRateLimiter{}
 | 
			
		||||
 | 
			
		||||
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
 | 
			
		||||
	return &ItemFastSlowRateLimiter{
 | 
			
		||||
		failures:        map[interface{}]int{},
 | 
			
		||||
		fastDelay:       fastDelay,
 | 
			
		||||
		slowDelay:       slowDelay,
 | 
			
		||||
		maxFastAttempts: maxFastAttempts,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
 | 
			
		||||
	r.failuresLock.Lock()
 | 
			
		||||
	defer r.failuresLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	r.failures[item] = r.failures[item] + 1
 | 
			
		||||
 | 
			
		||||
	if r.failures[item] <= r.maxFastAttempts {
 | 
			
		||||
		return r.fastDelay
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return r.slowDelay
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
 | 
			
		||||
	r.failuresLock.Lock()
 | 
			
		||||
	defer r.failuresLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	return r.failures[item]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
 | 
			
		||||
	r.failuresLock.Lock()
 | 
			
		||||
	defer r.failuresLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	delete(r.failures, item)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
 | 
			
		||||
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
 | 
			
		||||
// were separately delayed a longer time.
 | 
			
		||||
type MaxOfRateLimiter struct {
 | 
			
		||||
	limiters []RateLimiter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
 | 
			
		||||
	ret := time.Duration(0)
 | 
			
		||||
	for _, limiter := range r.limiters {
 | 
			
		||||
		curr := limiter.When(item)
 | 
			
		||||
		if curr > ret {
 | 
			
		||||
			ret = curr
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ret
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
 | 
			
		||||
	return &MaxOfRateLimiter{limiters: limiters}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
 | 
			
		||||
	ret := 0
 | 
			
		||||
	for _, limiter := range r.limiters {
 | 
			
		||||
		curr := limiter.NumRequeues(item)
 | 
			
		||||
		if curr > ret {
 | 
			
		||||
			ret = curr
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ret
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *MaxOfRateLimiter) Forget(item interface{}) {
 | 
			
		||||
	for _, limiter := range r.limiters {
 | 
			
		||||
		limiter.Forget(item)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										151
									
								
								pkg/util/workqueue/default_rate_limiters_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										151
									
								
								pkg/util/workqueue/default_rate_limiters_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,151 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package workqueue
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestItemExponentialFailureRateLimiter(t *testing.T) {
 | 
			
		||||
	limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
 | 
			
		||||
 | 
			
		||||
	if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 10*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 100*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 1*time.Second, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 1*time.Second, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 5, limiter.NumRequeues("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if e, a := 1*time.Millisecond, limiter.When("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 10*time.Millisecond, limiter.When("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 2, limiter.NumRequeues("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	limiter.Forget("one")
 | 
			
		||||
	if e, a := 0, limiter.NumRequeues("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestItemFastSlowRateLimiter(t *testing.T) {
 | 
			
		||||
	limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
 | 
			
		||||
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 10*time.Second, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 10*time.Second, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 5, limiter.NumRequeues("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 2, limiter.NumRequeues("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	limiter.Forget("one")
 | 
			
		||||
	if e, a := 0, limiter.NumRequeues("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMaxOfRateLimiter(t *testing.T) {
 | 
			
		||||
	limiter := NewMaxOfRateLimiter(
 | 
			
		||||
		NewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3),
 | 
			
		||||
		NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 10*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 100*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 3*time.Second, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 3*time.Second, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 5, limiter.NumRequeues("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 10*time.Millisecond, limiter.When("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 2, limiter.NumRequeues("two"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	limiter.Forget("one")
 | 
			
		||||
	if e, a := 0, limiter.NumRequeues("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										61
									
								
								pkg/util/workqueue/rate_limitting_queue.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										61
									
								
								pkg/util/workqueue/rate_limitting_queue.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,61 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package workqueue
 | 
			
		||||
 | 
			
		||||
// RateLimitingInterface is an Interface that can Add an item at a later time.  This makes it easier to
 | 
			
		||||
// requeue items after failures without ending up in a hot-loop.
 | 
			
		||||
type RateLimitingInterface interface {
 | 
			
		||||
	DelayingInterface
 | 
			
		||||
	// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
 | 
			
		||||
	AddRateLimited(item interface{})
 | 
			
		||||
 | 
			
		||||
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
 | 
			
		||||
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
 | 
			
		||||
	// still have to call `Done` on the queue.
 | 
			
		||||
	Forget(item interface{})
 | 
			
		||||
	// NumRequeues returns back how many times the item was requeued
 | 
			
		||||
	NumRequeues(item interface{}) int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
 | 
			
		||||
// Remember to call Forget!  If you don't, you may end up tracking failures forever.
 | 
			
		||||
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
 | 
			
		||||
	return &rateLimitingType{
 | 
			
		||||
		DelayingInterface: NewDelayingQueue(),
 | 
			
		||||
		rateLimiter:       rateLimiter,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
 | 
			
		||||
type rateLimitingType struct {
 | 
			
		||||
	DelayingInterface
 | 
			
		||||
 | 
			
		||||
	rateLimiter RateLimiter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddRateLimited AddAfter's the item based on the time when the rate limiter says its ok
 | 
			
		||||
func (q *rateLimitingType) AddRateLimited(item interface{}) {
 | 
			
		||||
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *rateLimitingType) NumRequeues(item interface{}) int {
 | 
			
		||||
	return q.rateLimiter.NumRequeues(item)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *rateLimitingType) Forget(item interface{}) {
 | 
			
		||||
	q.rateLimiter.Forget(item)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										74
									
								
								pkg/util/workqueue/rate_limitting_queue_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										74
									
								
								pkg/util/workqueue/rate_limitting_queue_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,74 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package workqueue
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestRateLimitingQueue(t *testing.T) {
 | 
			
		||||
	limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
 | 
			
		||||
	queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
 | 
			
		||||
	fakeClock := util.NewFakeClock(time.Now())
 | 
			
		||||
	delayingQueue := &delayingType{
 | 
			
		||||
		Interface:       New(),
 | 
			
		||||
		clock:           fakeClock,
 | 
			
		||||
		heartbeat:       fakeClock.Tick(maxWait),
 | 
			
		||||
		stopCh:          make(chan struct{}),
 | 
			
		||||
		waitingForAddCh: make(chan waitFor, 1000),
 | 
			
		||||
	}
 | 
			
		||||
	queue.DelayingInterface = delayingQueue
 | 
			
		||||
 | 
			
		||||
	queue.AddRateLimited("one")
 | 
			
		||||
	waitEntry := <-delayingQueue.waitingForAddCh
 | 
			
		||||
	if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	queue.AddRateLimited("one")
 | 
			
		||||
	waitEntry = <-delayingQueue.waitingForAddCh
 | 
			
		||||
	if e, a := 10*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	if e, a := 2, queue.NumRequeues("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	queue.AddRateLimited("two")
 | 
			
		||||
	waitEntry = <-delayingQueue.waitingForAddCh
 | 
			
		||||
	if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	queue.AddRateLimited("two")
 | 
			
		||||
	waitEntry = <-delayingQueue.waitingForAddCh
 | 
			
		||||
	if e, a := 10*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	queue.Forget("one")
 | 
			
		||||
	if e, a := 0, queue.NumRequeues("one"); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
	queue.AddRateLimited("one")
 | 
			
		||||
	waitEntry = <-delayingQueue.waitingForAddCh
 | 
			
		||||
	if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
			
		||||
		t.Errorf("expected %v, got %v", e, a)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user