From 0298e04ea0fa1f47db650886d08ae47cdab05985 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 5 May 2025 14:53:03 -0400 Subject: [PATCH] Updates to BoundedFrequencyRunner - Use structured logging. - Use t.Helper() in unit tests. - Improve some comments. - Remove an unnecessary check/panic. Co-authored-by: Antonio Ojea --- pkg/proxy/runner/bounded_frequency_runner.go | 65 +++++++------------ .../runner/bounded_frequency_runner_test.go | 7 ++ 2 files changed, 30 insertions(+), 42 deletions(-) diff --git a/pkg/proxy/runner/bounded_frequency_runner.go b/pkg/proxy/runner/bounded_frequency_runner.go index 8b13c2fd4b5..3ad3255e4b0 100644 --- a/pkg/proxy/runner/bounded_frequency_runner.go +++ b/pkg/proxy/runner/bounded_frequency_runner.go @@ -26,8 +26,7 @@ import ( "k8s.io/klog/v2" ) -// BoundedFrequencyRunner manages runs of a user-provided function. -// See NewBoundedFrequencyRunner for examples. +// BoundedFrequencyRunner manages runs of a user-provided work function. type BoundedFrequencyRunner struct { name string // the name of this instance minInterval time.Duration // the min time between runs, modulo bursts @@ -36,7 +35,7 @@ type BoundedFrequencyRunner struct { run chan struct{} // try an async run mu sync.Mutex // guards runs of fn and all mutations - fn func() // function to run + fn func() // the work function lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs @@ -123,35 +122,24 @@ func (rt *realTimer) Sleep(d time.Duration) { var _ timer = &realTimer{} -// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance, -// which will manage runs of the specified function. +// NewBoundedFrequencyRunner creates and returns a new BoundedFrequencyRunner. +// This runner manages the execution frequency of the provided work function `fn`. // // All runs will be async to the caller of BoundedFrequencyRunner.Run, but // multiple runs are serialized. If the function needs to hold locks, it must // take them internally. // -// Runs of the function will have at least minInterval between them (from -// completion to next start), except that up to bursts may be allowed. Burst -// runs are "accumulated" over time, one per minInterval up to burstRuns total. -// This can be used, for example, to mitigate the impact of expensive operations -// being called in response to user-initiated operations. Run requests that -// would violate the minInterval are coalesced and run at the next opportunity. +// The runner guarantees two properties: +// 1. Minimum Interval (`minInterval`): At least `minInterval` must pass between +// the *completion* of one execution and the *start* of the next. Calls to +// `Run()` during this cooldown period are coalesced and deferred until the +// interval expires. This prevents burst executions. +// 2. Maximum Interval (`maxInterval`): The function `fn` is guaranteed to run +// at least once per `maxInterval`, ensuring periodic execution even without +// explicit `Run()` calls (e.g., for refreshing state). // -// The function will be run at least once per maxInterval. For example, this can -// force periodic refreshes of state in the absence of anyone calling Run. -// -// Examples: -// -// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1) -// - fn will have at least 1 second between runs -// - fn will have no more than 5 seconds between runs -// -// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3) -// - fn will have at least 3 seconds between runs, with up to 3 burst runs -// - fn will have no more than 10 seconds between runs -// -// The maxInterval must be greater than or equal to the minInterval, If the -// caller passes a maxInterval less than minInterval, this function will panic. +// `maxInterval` must be greater than or equal to `minInterval`; otherwise, +// this function will panic. func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately <-timer.C() // consume the first tick @@ -163,9 +151,6 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b if maxInterval < minInterval { panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval)) } - if timer == nil { - panic(fmt.Sprintf("%s: timer must be non-nil", name)) - } bfr := &BoundedFrequencyRunner{ name: name, @@ -189,13 +174,13 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b // Loop handles the periodic timer and run requests. This is expected to be // called as a goroutine. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { - klog.V(3).Infof("%s Loop running", bfr.name) + klog.V(3).InfoS("Loop running", "runner", bfr.name) bfr.timer.Reset(bfr.maxInterval) for { select { case <-stop: bfr.stop() - klog.V(3).Infof("%s Loop stopping", bfr.name) + klog.V(3).InfoS("Loop stopping", "runner", bfr.name) return case <-bfr.timer.C(): bfr.tryRun() @@ -207,16 +192,12 @@ func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { } } -// Run the function as soon as possible. If this is called while Loop is not +// Run the work function as soon as possible. If this is called while Loop is not // running, the call may be deferred indefinitely. -// If there is already a queued request to call the underlying function, it -// may be dropped - it is just guaranteed that we will try calling the -// underlying function as soon as possible starting from now. +// Once there is a queued request to call the work function, further calls to +// Run() will have no effect until after it runs. func (bfr *BoundedFrequencyRunner) Run() { - // If it takes a lot of time to run the underlying function, noone is really - // processing elements from channel. So to avoid blocking here on the - // putting element to it, we simply skip it if there is already an element - // in it. + // If bfr.run is empty, push an element onto it. Otherwise, do nothing. select { case bfr.run <- struct{}{}: default: @@ -276,7 +257,7 @@ func (bfr *BoundedFrequencyRunner) doRetry() { retryInterval := bfr.retryTime.Sub(bfr.timer.Now()) bfr.retryTime = time.Time{} if retryInterval < bfr.timer.Remaining() { - klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval) + klog.V(3).InfoS("retrying", "runner", bfr.name, "interval", retryInterval) bfr.timer.Stop() bfr.timer.Reset(retryInterval) } @@ -293,7 +274,7 @@ func (bfr *BoundedFrequencyRunner) tryRun() { bfr.lastRun = bfr.timer.Now() bfr.timer.Stop() bfr.timer.Reset(bfr.maxInterval) - klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval) + klog.V(3).InfoS("ran", "runner", bfr.name, "minInterval", bfr.minInterval, "maxInternval", bfr.maxInterval) return } @@ -301,7 +282,7 @@ func (bfr *BoundedFrequencyRunner) tryRun() { elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run nextPossible := bfr.minInterval - elapsed // time to next possible run nextScheduled := bfr.timer.Remaining() // time to next scheduled run - klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) + klog.V(4).InfoS("can't run", "runner", bfr.name, "elapsed", elapsed, "nextPossible", nextPossible, "nextScheduled", nextScheduled) // It's hard to avoid race conditions in the unit tests unless we always reset // the timer here, even when it's unchanged diff --git a/pkg/proxy/runner/bounded_frequency_runner_test.go b/pkg/proxy/runner/bounded_frequency_runner_test.go index e4f4e4933d2..3d912f15249 100644 --- a/pkg/proxy/runner/bounded_frequency_runner_test.go +++ b/pkg/proxy/runner/bounded_frequency_runner_test.go @@ -152,6 +152,7 @@ func (ft *fakeTimer) advance(d time.Duration) { // return the calling line number (for printing) // test the timer's state func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) { + t.Helper() if upd.active != active { t.Fatalf("%s: expected timer active=%v", name, active) } @@ -162,6 +163,7 @@ func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next ti // test and reset the receiver's state func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) { + t.Helper() triggered := receiver.reset() if expected && !triggered { t.Fatalf("%s: function should have been called", name) @@ -175,6 +177,7 @@ var minInterval = 1 * time.Second var maxInterval = 10 * time.Second func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) { + t.Helper() upd := <-timer.updated // wait for stop checkReceiver(name, t, obj, expectCall) checkReceiver(name, t, obj, false) // prove post-condition @@ -184,20 +187,24 @@ func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, ex } func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) { + t.Helper() waitForReset(name, t, timer, obj, true, maxInterval) } func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { + t.Helper() // It will first get reset as with a normal run, and then get set again waitForRun(name, t, timer, obj) waitForReset(name, t, timer, obj, false, expectNext) } func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { + t.Helper() waitForReset(name, t, timer, obj, false, expectNext) } func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) { + t.Helper() select { case <-timer.c: t.Fatalf("%s: unexpected timer tick", name)