mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	Merge pull request #104578 from MadhavJivrajani/refactor-rate-limiters
Move client-go/tools/record tests away from `IntervalClock` to `SimpleIntervalClock`
This commit is contained in:
		@@ -81,7 +81,7 @@ type CorrelatorOptions struct {
 | 
				
			|||||||
	MaxIntervalInSeconds int
 | 
						MaxIntervalInSeconds int
 | 
				
			||||||
	// The clock used by the EventAggregator to allow for testing
 | 
						// The clock used by the EventAggregator to allow for testing
 | 
				
			||||||
	// If not specified (zero value), clock.RealClock{} will be used
 | 
						// If not specified (zero value), clock.RealClock{} will be used
 | 
				
			||||||
	Clock clock.Clock
 | 
						Clock clock.PassiveClock
 | 
				
			||||||
	// The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place
 | 
						// The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place
 | 
				
			||||||
	// If not specified (zero value), getSpamKey will be used
 | 
						// If not specified (zero value), getSpamKey will be used
 | 
				
			||||||
	SpamKeyFunc EventSpamKeyFunc
 | 
						SpamKeyFunc EventSpamKeyFunc
 | 
				
			||||||
@@ -323,7 +323,7 @@ type recorderImpl struct {
 | 
				
			|||||||
	scheme *runtime.Scheme
 | 
						scheme *runtime.Scheme
 | 
				
			||||||
	source v1.EventSource
 | 
						source v1.EventSource
 | 
				
			||||||
	*watch.Broadcaster
 | 
						*watch.Broadcaster
 | 
				
			||||||
	clock clock.Clock
 | 
						clock clock.PassiveClock
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
 | 
					func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,6 +34,7 @@ import (
 | 
				
			|||||||
	"k8s.io/client-go/kubernetes/scheme"
 | 
						"k8s.io/client-go/kubernetes/scheme"
 | 
				
			||||||
	restclient "k8s.io/client-go/rest"
 | 
						restclient "k8s.io/client-go/rest"
 | 
				
			||||||
	ref "k8s.io/client-go/tools/reference"
 | 
						ref "k8s.io/client-go/tools/reference"
 | 
				
			||||||
 | 
						testclocks "k8s.io/utils/clock/testing"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type testEventSink struct {
 | 
					type testEventSink struct {
 | 
				
			||||||
@@ -438,7 +439,7 @@ func TestWriteEventError(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
 | 
						clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
 | 
				
			||||||
	eventCorrelator := NewEventCorrelator(&clock)
 | 
						eventCorrelator := NewEventCorrelator(&clock)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for caseName, ent := range table {
 | 
						for caseName, ent := range table {
 | 
				
			||||||
@@ -461,7 +462,7 @@ func TestWriteEventError(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestUpdateExpiredEvent(t *testing.T) {
 | 
					func TestUpdateExpiredEvent(t *testing.T) {
 | 
				
			||||||
	clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
 | 
						clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
 | 
				
			||||||
	eventCorrelator := NewEventCorrelator(&clock)
 | 
						eventCorrelator := NewEventCorrelator(&clock)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var createdEvent *v1.Event
 | 
						var createdEvent *v1.Event
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -102,14 +102,14 @@ type EventSourceObjectSpamFilter struct {
 | 
				
			|||||||
	qps float32
 | 
						qps float32
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// clock is used to allow for testing over a time interval
 | 
						// clock is used to allow for testing over a time interval
 | 
				
			||||||
	clock clock.Clock
 | 
						clock clock.PassiveClock
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events.
 | 
						// spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events.
 | 
				
			||||||
	spamKeyFunc EventSpamKeyFunc
 | 
						spamKeyFunc EventSpamKeyFunc
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
 | 
					// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
 | 
				
			||||||
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
 | 
					func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.PassiveClock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
 | 
				
			||||||
	return &EventSourceObjectSpamFilter{
 | 
						return &EventSourceObjectSpamFilter{
 | 
				
			||||||
		cache:       lru.New(lruCacheSize),
 | 
							cache:       lru.New(lruCacheSize),
 | 
				
			||||||
		burst:       burst,
 | 
							burst:       burst,
 | 
				
			||||||
@@ -122,7 +122,7 @@ func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock
 | 
				
			|||||||
// spamRecord holds data used to perform spam filtering decisions.
 | 
					// spamRecord holds data used to perform spam filtering decisions.
 | 
				
			||||||
type spamRecord struct {
 | 
					type spamRecord struct {
 | 
				
			||||||
	// rateLimiter controls the rate of events about this object
 | 
						// rateLimiter controls the rate of events about this object
 | 
				
			||||||
	rateLimiter flowcontrol.RateLimiter
 | 
						rateLimiter flowcontrol.PassiveRateLimiter
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Filter controls that a given source+object are not exceeding the allowed rate.
 | 
					// Filter controls that a given source+object are not exceeding the allowed rate.
 | 
				
			||||||
@@ -142,7 +142,7 @@ func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// verify we have a rate limiter for this record
 | 
						// verify we have a rate limiter for this record
 | 
				
			||||||
	if record.rateLimiter == nil {
 | 
						if record.rateLimiter == nil {
 | 
				
			||||||
		record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
 | 
							record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// ensure we have available rate
 | 
						// ensure we have available rate
 | 
				
			||||||
@@ -207,12 +207,12 @@ type EventAggregator struct {
 | 
				
			|||||||
	maxIntervalInSeconds uint
 | 
						maxIntervalInSeconds uint
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// clock is used to allow for testing over a time interval
 | 
						// clock is used to allow for testing over a time interval
 | 
				
			||||||
	clock clock.Clock
 | 
						clock clock.PassiveClock
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewEventAggregator returns a new instance of an EventAggregator
 | 
					// NewEventAggregator returns a new instance of an EventAggregator
 | 
				
			||||||
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
 | 
					func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
 | 
				
			||||||
	maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
 | 
						maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator {
 | 
				
			||||||
	return &EventAggregator{
 | 
						return &EventAggregator{
 | 
				
			||||||
		cache:                lru.New(lruCacheSize),
 | 
							cache:                lru.New(lruCacheSize),
 | 
				
			||||||
		keyFunc:              keyFunc,
 | 
							keyFunc:              keyFunc,
 | 
				
			||||||
@@ -315,11 +315,11 @@ type eventLog struct {
 | 
				
			|||||||
type eventLogger struct {
 | 
					type eventLogger struct {
 | 
				
			||||||
	sync.RWMutex
 | 
						sync.RWMutex
 | 
				
			||||||
	cache *lru.Cache
 | 
						cache *lru.Cache
 | 
				
			||||||
	clock clock.Clock
 | 
						clock clock.PassiveClock
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// newEventLogger observes events and counts their frequencies
 | 
					// newEventLogger observes events and counts their frequencies
 | 
				
			||||||
func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
 | 
					func newEventLogger(lruCacheEntries int, clock clock.PassiveClock) *eventLogger {
 | 
				
			||||||
	return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
 | 
						return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -436,7 +436,7 @@ type EventCorrelateResult struct {
 | 
				
			|||||||
//     times.
 | 
					//     times.
 | 
				
			||||||
//   * A source may burst 25 events about an object, but has a refill rate budget
 | 
					//   * A source may burst 25 events about an object, but has a refill rate budget
 | 
				
			||||||
//     per object of 1 event every 5 minutes to control long-tail of spam.
 | 
					//     per object of 1 event every 5 minutes to control long-tail of spam.
 | 
				
			||||||
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
 | 
					func NewEventCorrelator(clock clock.PassiveClock) *EventCorrelator {
 | 
				
			||||||
	cacheSize := maxLruCacheEntries
 | 
						cacheSize := maxLruCacheEntries
 | 
				
			||||||
	spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
 | 
						spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
 | 
				
			||||||
	return &EventCorrelator{
 | 
						return &EventCorrelator{
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -25,8 +25,8 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/clock"
 | 
					 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/diff"
 | 
						"k8s.io/apimachinery/pkg/util/diff"
 | 
				
			||||||
 | 
						testclocks "k8s.io/utils/clock/testing"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func makeObjectReference(kind, name, namespace string) v1.ObjectReference {
 | 
					func makeObjectReference(kind, name, namespace string) v1.ObjectReference {
 | 
				
			||||||
@@ -234,7 +234,7 @@ func TestEventCorrelator(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	for testScenario, testInput := range scenario {
 | 
						for testScenario, testInput := range scenario {
 | 
				
			||||||
		eventInterval := time.Duration(testInput.intervalSeconds) * time.Second
 | 
							eventInterval := time.Duration(testInput.intervalSeconds) * time.Second
 | 
				
			||||||
		clock := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
 | 
							clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval}
 | 
				
			||||||
		correlator := NewEventCorrelator(&clock)
 | 
							correlator := NewEventCorrelator(&clock)
 | 
				
			||||||
		for i := range testInput.previousEvents {
 | 
							for i := range testInput.previousEvents {
 | 
				
			||||||
			event := testInput.previousEvents[i]
 | 
								event := testInput.previousEvents[i]
 | 
				
			||||||
@@ -320,9 +320,9 @@ func TestEventSpamFilter(t *testing.T) {
 | 
				
			|||||||
			spamKeyFunc:  spamKeyFuncBasedOnObjectsAndReason,
 | 
								spamKeyFunc:  spamKeyFuncBasedOnObjectsAndReason,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for testDescription, testInput := range testCases {
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
 | 
						for testDescription, testInput := range testCases {
 | 
				
			||||||
 | 
							c := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval}
 | 
				
			||||||
		correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{
 | 
							correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{
 | 
				
			||||||
			Clock:       &c,
 | 
								Clock:       &c,
 | 
				
			||||||
			SpamKeyFunc: testInput.spamKeyFunc,
 | 
								SpamKeyFunc: testInput.spamKeyFunc,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -23,26 +23,36 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"golang.org/x/time/rate"
 | 
						"golang.org/x/time/rate"
 | 
				
			||||||
 | 
						"k8s.io/utils/clock"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type RateLimiter interface {
 | 
					type PassiveRateLimiter interface {
 | 
				
			||||||
	// TryAccept returns true if a token is taken immediately. Otherwise,
 | 
						// TryAccept returns true if a token is taken immediately. Otherwise,
 | 
				
			||||||
	// it returns false.
 | 
						// it returns false.
 | 
				
			||||||
	TryAccept() bool
 | 
						TryAccept() bool
 | 
				
			||||||
	// Accept returns once a token becomes available.
 | 
					 | 
				
			||||||
	Accept()
 | 
					 | 
				
			||||||
	// Stop stops the rate limiter, subsequent calls to CanAccept will return false
 | 
						// Stop stops the rate limiter, subsequent calls to CanAccept will return false
 | 
				
			||||||
	Stop()
 | 
						Stop()
 | 
				
			||||||
	// QPS returns QPS of this rate limiter
 | 
						// QPS returns QPS of this rate limiter
 | 
				
			||||||
	QPS() float32
 | 
						QPS() float32
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type RateLimiter interface {
 | 
				
			||||||
 | 
						PassiveRateLimiter
 | 
				
			||||||
 | 
						// Accept returns once a token becomes available.
 | 
				
			||||||
 | 
						Accept()
 | 
				
			||||||
	// Wait returns nil if a token is taken before the Context is done.
 | 
						// Wait returns nil if a token is taken before the Context is done.
 | 
				
			||||||
	Wait(ctx context.Context) error
 | 
						Wait(ctx context.Context) error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type tokenBucketRateLimiter struct {
 | 
					type tokenBucketPassiveRateLimiter struct {
 | 
				
			||||||
	limiter *rate.Limiter
 | 
						limiter *rate.Limiter
 | 
				
			||||||
	clock   Clock
 | 
					 | 
				
			||||||
	qps     float32
 | 
						qps     float32
 | 
				
			||||||
 | 
						clock   clock.PassiveClock
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type tokenBucketRateLimiter struct {
 | 
				
			||||||
 | 
						tokenBucketPassiveRateLimiter
 | 
				
			||||||
 | 
						clock Clock
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
 | 
					// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
 | 
				
			||||||
@@ -52,58 +62,73 @@ type tokenBucketRateLimiter struct {
 | 
				
			|||||||
// The maximum number of tokens in the bucket is capped at 'burst'.
 | 
					// The maximum number of tokens in the bucket is capped at 'burst'.
 | 
				
			||||||
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
 | 
					func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
 | 
				
			||||||
	limiter := rate.NewLimiter(rate.Limit(qps), burst)
 | 
						limiter := rate.NewLimiter(rate.Limit(qps), burst)
 | 
				
			||||||
	return newTokenBucketRateLimiter(limiter, realClock{}, qps)
 | 
						return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewTokenBucketPassiveRateLimiter is similar to NewTokenBucketRateLimiter except that it returns
 | 
				
			||||||
 | 
					// a PassiveRateLimiter which does not have Accept() and Wait() methods.
 | 
				
			||||||
 | 
					func NewTokenBucketPassiveRateLimiter(qps float32, burst int) PassiveRateLimiter {
 | 
				
			||||||
 | 
						limiter := rate.NewLimiter(rate.Limit(qps), burst)
 | 
				
			||||||
 | 
						return newTokenBucketRateLimiterWithPassiveClock(limiter, clock.RealClock{}, qps)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// An injectable, mockable clock interface.
 | 
					// An injectable, mockable clock interface.
 | 
				
			||||||
type Clock interface {
 | 
					type Clock interface {
 | 
				
			||||||
	Now() time.Time
 | 
						clock.PassiveClock
 | 
				
			||||||
	Sleep(time.Duration)
 | 
						Sleep(time.Duration)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type realClock struct{}
 | 
					var _ Clock = (*clock.RealClock)(nil)
 | 
				
			||||||
 | 
					 | 
				
			||||||
func (realClock) Now() time.Time {
 | 
					 | 
				
			||||||
	return time.Now()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (realClock) Sleep(d time.Duration) {
 | 
					 | 
				
			||||||
	time.Sleep(d)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
 | 
					// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
 | 
				
			||||||
// but allows an injectable clock, for testing.
 | 
					// but allows an injectable clock, for testing.
 | 
				
			||||||
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
 | 
					func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
 | 
				
			||||||
	limiter := rate.NewLimiter(rate.Limit(qps), burst)
 | 
						limiter := rate.NewLimiter(rate.Limit(qps), burst)
 | 
				
			||||||
	return newTokenBucketRateLimiter(limiter, c, qps)
 | 
						return newTokenBucketRateLimiterWithClock(limiter, c, qps)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
 | 
					// NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
 | 
				
			||||||
 | 
					// except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
 | 
				
			||||||
 | 
					// and uses a PassiveClock.
 | 
				
			||||||
 | 
					func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
 | 
				
			||||||
 | 
						limiter := rate.NewLimiter(rate.Limit(qps), burst)
 | 
				
			||||||
 | 
						return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
 | 
				
			||||||
	return &tokenBucketRateLimiter{
 | 
						return &tokenBucketRateLimiter{
 | 
				
			||||||
		limiter: limiter,
 | 
							tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
 | 
				
			||||||
		clock:   c,
 | 
							clock:                         c,
 | 
				
			||||||
		qps:     qps,
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *tokenBucketRateLimiter) TryAccept() bool {
 | 
					func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
 | 
				
			||||||
	return t.limiter.AllowN(t.clock.Now(), 1)
 | 
						return &tokenBucketPassiveRateLimiter{
 | 
				
			||||||
 | 
							limiter: limiter,
 | 
				
			||||||
 | 
							qps:     qps,
 | 
				
			||||||
 | 
							clock:   c,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (tbprl *tokenBucketPassiveRateLimiter) Stop() {
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (tbprl *tokenBucketPassiveRateLimiter) QPS() float32 {
 | 
				
			||||||
 | 
						return tbprl.qps
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
 | 
				
			||||||
 | 
						return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Accept will block until a token becomes available
 | 
					// Accept will block until a token becomes available
 | 
				
			||||||
func (t *tokenBucketRateLimiter) Accept() {
 | 
					func (tbrl *tokenBucketRateLimiter) Accept() {
 | 
				
			||||||
	now := t.clock.Now()
 | 
						now := tbrl.clock.Now()
 | 
				
			||||||
	t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
 | 
						tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *tokenBucketRateLimiter) Stop() {
 | 
					func (tbrl *tokenBucketRateLimiter) Wait(ctx context.Context) error {
 | 
				
			||||||
}
 | 
						return tbrl.limiter.Wait(ctx)
 | 
				
			||||||
 | 
					 | 
				
			||||||
func (t *tokenBucketRateLimiter) QPS() float32 {
 | 
					 | 
				
			||||||
	return t.qps
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (t *tokenBucketRateLimiter) Wait(ctx context.Context) error {
 | 
					 | 
				
			||||||
	return t.limiter.Wait(ctx)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type fakeAlwaysRateLimiter struct{}
 | 
					type fakeAlwaysRateLimiter struct{}
 | 
				
			||||||
@@ -157,3 +182,11 @@ func (t *fakeNeverRateLimiter) QPS() float32 {
 | 
				
			|||||||
func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
 | 
					func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
 | 
				
			||||||
	return errors.New("can not be accept")
 | 
						return errors.New("can not be accept")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						_ RateLimiter = (*tokenBucketRateLimiter)(nil)
 | 
				
			||||||
 | 
						_ RateLimiter = (*fakeAlwaysRateLimiter)(nil)
 | 
				
			||||||
 | 
						_ RateLimiter = (*fakeNeverRateLimiter)(nil)
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ PassiveRateLimiter = (*tokenBucketPassiveRateLimiter)(nil)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -190,6 +190,10 @@ func (fc *fakeClock) Sleep(d time.Duration) {
 | 
				
			|||||||
	fc.now = fc.now.Add(d)
 | 
						fc.now = fc.now.Add(d)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (fc *fakeClock) Since(ts time.Time) time.Duration {
 | 
				
			||||||
 | 
						return time.Since(ts)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestRatePrecisionBug(t *testing.T) {
 | 
					func TestRatePrecisionBug(t *testing.T) {
 | 
				
			||||||
	// golang.org/x/time/rate used to have bugs around precision and this
 | 
						// golang.org/x/time/rate used to have bugs around precision and this
 | 
				
			||||||
	// proves that they don't recur (at least in the form we know about).  This
 | 
						// proves that they don't recur (at least in the form we know about).  This
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user