Updates to BoundedFrequencyRunner

- Use structured logging.
- Use t.Helper() in unit tests.
- Improve some comments.
- Remove an unnecessary check/panic.

Co-authored-by: Antonio Ojea <aojea@google.com>
This commit is contained in:
Dan Winship
2025-05-05 14:53:03 -04:00
parent 6da9d363f3
commit 0298e04ea0
2 changed files with 30 additions and 42 deletions

View File

@@ -26,8 +26,7 @@ import (
"k8s.io/klog/v2"
)
// BoundedFrequencyRunner manages runs of a user-provided function.
// See NewBoundedFrequencyRunner for examples.
// BoundedFrequencyRunner manages runs of a user-provided work function.
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs, modulo bursts
@@ -36,7 +35,7 @@ type BoundedFrequencyRunner struct {
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run
fn func() // the work function
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs
@@ -123,35 +122,24 @@ func (rt *realTimer) Sleep(d time.Duration) {
var _ timer = &realTimer{}
// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
// which will manage runs of the specified function.
// NewBoundedFrequencyRunner creates and returns a new BoundedFrequencyRunner.
// This runner manages the execution frequency of the provided work function `fn`.
//
// All runs will be async to the caller of BoundedFrequencyRunner.Run, but
// multiple runs are serialized. If the function needs to hold locks, it must
// take them internally.
//
// Runs of the function will have at least minInterval between them (from
// completion to next start), except that up to bursts may be allowed. Burst
// runs are "accumulated" over time, one per minInterval up to burstRuns total.
// This can be used, for example, to mitigate the impact of expensive operations
// being called in response to user-initiated operations. Run requests that
// would violate the minInterval are coalesced and run at the next opportunity.
// The runner guarantees two properties:
// 1. Minimum Interval (`minInterval`): At least `minInterval` must pass between
// the *completion* of one execution and the *start* of the next. Calls to
// `Run()` during this cooldown period are coalesced and deferred until the
// interval expires. This prevents burst executions.
// 2. Maximum Interval (`maxInterval`): The function `fn` is guaranteed to run
// at least once per `maxInterval`, ensuring periodic execution even without
// explicit `Run()` calls (e.g., for refreshing state).
//
// The function will be run at least once per maxInterval. For example, this can
// force periodic refreshes of state in the absence of anyone calling Run.
//
// Examples:
//
// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1)
// - fn will have at least 1 second between runs
// - fn will have no more than 5 seconds between runs
//
// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
// - fn will have at least 3 seconds between runs, with up to 3 burst runs
// - fn will have no more than 10 seconds between runs
//
// The maxInterval must be greater than or equal to the minInterval, If the
// caller passes a maxInterval less than minInterval, this function will panic.
// `maxInterval` must be greater than or equal to `minInterval`; otherwise,
// this function will panic.
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
<-timer.C() // consume the first tick
@@ -163,9 +151,6 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b
if maxInterval < minInterval {
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
}
if timer == nil {
panic(fmt.Sprintf("%s: timer must be non-nil", name))
}
bfr := &BoundedFrequencyRunner{
name: name,
@@ -189,13 +174,13 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b
// Loop handles the periodic timer and run requests. This is expected to be
// called as a goroutine.
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
klog.V(3).Infof("%s Loop running", bfr.name)
klog.V(3).InfoS("Loop running", "runner", bfr.name)
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
klog.V(3).Infof("%s Loop stopping", bfr.name)
klog.V(3).InfoS("Loop stopping", "runner", bfr.name)
return
case <-bfr.timer.C():
bfr.tryRun()
@@ -207,16 +192,12 @@ func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
}
}
// Run the function as soon as possible. If this is called while Loop is not
// Run the work function as soon as possible. If this is called while Loop is not
// running, the call may be deferred indefinitely.
// If there is already a queued request to call the underlying function, it
// may be dropped - it is just guaranteed that we will try calling the
// underlying function as soon as possible starting from now.
// Once there is a queued request to call the work function, further calls to
// Run() will have no effect until after it runs.
func (bfr *BoundedFrequencyRunner) Run() {
// If it takes a lot of time to run the underlying function, noone is really
// processing elements from <run> channel. So to avoid blocking here on the
// putting element to it, we simply skip it if there is already an element
// in it.
// If bfr.run is empty, push an element onto it. Otherwise, do nothing.
select {
case bfr.run <- struct{}{}:
default:
@@ -276,7 +257,7 @@ func (bfr *BoundedFrequencyRunner) doRetry() {
retryInterval := bfr.retryTime.Sub(bfr.timer.Now())
bfr.retryTime = time.Time{}
if retryInterval < bfr.timer.Remaining() {
klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval)
klog.V(3).InfoS("retrying", "runner", bfr.name, "interval", retryInterval)
bfr.timer.Stop()
bfr.timer.Reset(retryInterval)
}
@@ -293,7 +274,7 @@ func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.lastRun = bfr.timer.Now()
bfr.timer.Stop()
bfr.timer.Reset(bfr.maxInterval)
klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
klog.V(3).InfoS("ran", "runner", bfr.name, "minInterval", bfr.minInterval, "maxInternval", bfr.maxInterval)
return
}
@@ -301,7 +282,7 @@ func (bfr *BoundedFrequencyRunner) tryRun() {
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
nextPossible := bfr.minInterval - elapsed // time to next possible run
nextScheduled := bfr.timer.Remaining() // time to next scheduled run
klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
klog.V(4).InfoS("can't run", "runner", bfr.name, "elapsed", elapsed, "nextPossible", nextPossible, "nextScheduled", nextScheduled)
// It's hard to avoid race conditions in the unit tests unless we always reset
// the timer here, even when it's unchanged

View File

@@ -152,6 +152,7 @@ func (ft *fakeTimer) advance(d time.Duration) {
// return the calling line number (for printing)
// test the timer's state
func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
t.Helper()
if upd.active != active {
t.Fatalf("%s: expected timer active=%v", name, active)
}
@@ -162,6 +163,7 @@ func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next ti
// test and reset the receiver's state
func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
t.Helper()
triggered := receiver.reset()
if expected && !triggered {
t.Fatalf("%s: function should have been called", name)
@@ -175,6 +177,7 @@ var minInterval = 1 * time.Second
var maxInterval = 10 * time.Second
func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
t.Helper()
upd := <-timer.updated // wait for stop
checkReceiver(name, t, obj, expectCall)
checkReceiver(name, t, obj, false) // prove post-condition
@@ -184,20 +187,24 @@ func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, ex
}
func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
t.Helper()
waitForReset(name, t, timer, obj, true, maxInterval)
}
func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
t.Helper()
// It will first get reset as with a normal run, and then get set again
waitForRun(name, t, timer, obj)
waitForReset(name, t, timer, obj, false, expectNext)
}
func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
t.Helper()
waitForReset(name, t, timer, obj, false, expectNext)
}
func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
t.Helper()
select {
case <-timer.c:
t.Fatalf("%s: unexpected timer tick", name)