Files
kubernetes/pkg/proxy/runner/bounded_frequency_runner.go
2025-07-01 08:54:14 -04:00

156 lines
5.1 KiB
Go

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runner
import (
"fmt"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
// BoundedFrequencyRunner manages runs of a user-provided work function.
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs
retryInterval time.Duration // the time between a run and a retry
maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
fn func() error // the work function
minIntervalTimer clock.Timer
nextRunTimer clock.Timer // Combined timer for maxInterval and retryInterval logic
clock clock.Clock
}
// NewBoundedFrequencyRunner creates and returns a new BoundedFrequencyRunner.
// This runner manages the execution frequency of the provided function `fn`.
//
// The runner guarantees two properties:
// 1. Minimum Interval (`minInterval`): At least `minInterval` must pass between
// the *completion* of one execution and the *start* of the next. Calls to
// `Run()` during this cooldown period are coalesced and deferred until the
// interval expires. This prevents burst executions.
// 2. Maximum Interval (`maxInterval`): The function `fn` is guaranteed to run
// at least once per `maxInterval`, ensuring periodic execution even without
// explicit `Run()` calls (e.g., for refreshing state).
//
// `maxInterval` must be greater than or equal to `minInterval`; otherwise,
// this function will panic.
//
// If `fn` returns an error, then it will be run again no later than `retryInterval`
// (unless another trigger, like `Run()` or `maxInterval`, causes it to run sooner). Any
// successful run will abort the retry attempt.
func NewBoundedFrequencyRunner(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration) *BoundedFrequencyRunner {
return construct(name, fn, minInterval, retryInterval, maxInterval, clock.RealClock{})
}
// Make an instance with dependencies injected.
func construct(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration, clock clock.Clock) *BoundedFrequencyRunner {
if maxInterval < minInterval {
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
}
bfr := &BoundedFrequencyRunner{
name: name,
fn: fn,
minInterval: minInterval,
retryInterval: retryInterval,
maxInterval: maxInterval,
run: make(chan struct{}, 1),
clock: clock,
}
return bfr
}
// Loop handles the periodic timer and run requests. This is expected to be
// called as a goroutine.
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
klog.V(3).InfoS("Loop running", "runner", bfr.name)
defer close(bfr.run)
bfr.minIntervalTimer = bfr.clock.NewTimer(bfr.minInterval)
defer bfr.minIntervalTimer.Stop()
// Initialize nextRunTimer with maxInterval
bfr.nextRunTimer = bfr.clock.NewTimer(bfr.maxInterval)
defer bfr.nextRunTimer.Stop()
for {
select {
case <-stop:
klog.V(3).InfoS("Loop stopping", "runner", bfr.name)
return
case <-bfr.nextRunTimer.C(): // Wait on the single timer
case <-bfr.run:
}
// stop the timers here to allow the tests using the fake clock to synchronize
// with the fakeClock.HasWaiters() method. The timers are reset after the function
// is executed.
bfr.minIntervalTimer.Stop()
bfr.nextRunTimer.Stop()
var err error
// avoid crashing if the function executed crashes
func() {
defer utilruntime.HandleCrash()
err = bfr.fn()
}()
// Determine the next interval based on the result
nextInterval := bfr.maxInterval
if err != nil {
// If error, ensure next run is within retryInterval and maxInterval
if bfr.retryInterval < nextInterval {
nextInterval = bfr.retryInterval
}
klog.V(3).InfoS("scheduling retry", "runner", bfr.name, "interval", nextInterval, "error", err)
}
// Reset the timers
bfr.minIntervalTimer.Reset(bfr.minInterval)
bfr.nextRunTimer.Reset(nextInterval)
// Wait for minInterval before looping
select {
case <-stop:
klog.V(3).InfoS("Loop stopping", "runner", bfr.name)
return
case <-bfr.minIntervalTimer.C():
}
}
}
// Run the work function as soon as possible. If this is called while Loop is not
// running, the call may be deferred indefinitely.
// Once there is a queued request to call the work function, further calls to
// Run() will have no effect until after it runs.
func (bfr *BoundedFrequencyRunner) Run() {
// If bfr.run is empty, push an element onto it. Otherwise, do nothing.
select {
case bfr.run <- struct{}{}:
default:
}
}