mirror of
https://github.com/outbackdingo/kubernetes.git
synced 2026-01-27 18:19:28 +00:00
156 lines
5.1 KiB
Go
156 lines
5.1 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package runner
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/utils/clock"
|
|
)
|
|
|
|
// BoundedFrequencyRunner manages runs of a user-provided work function.
|
|
type BoundedFrequencyRunner struct {
|
|
name string // the name of this instance
|
|
|
|
minInterval time.Duration // the min time between runs
|
|
retryInterval time.Duration // the time between a run and a retry
|
|
maxInterval time.Duration // the max time between runs
|
|
|
|
run chan struct{} // try an async run
|
|
|
|
fn func() error // the work function
|
|
minIntervalTimer clock.Timer
|
|
nextRunTimer clock.Timer // Combined timer for maxInterval and retryInterval logic
|
|
clock clock.Clock
|
|
}
|
|
|
|
// NewBoundedFrequencyRunner creates and returns a new BoundedFrequencyRunner.
|
|
// This runner manages the execution frequency of the provided function `fn`.
|
|
//
|
|
// The runner guarantees two properties:
|
|
// 1. Minimum Interval (`minInterval`): At least `minInterval` must pass between
|
|
// the *completion* of one execution and the *start* of the next. Calls to
|
|
// `Run()` during this cooldown period are coalesced and deferred until the
|
|
// interval expires. This prevents burst executions.
|
|
// 2. Maximum Interval (`maxInterval`): The function `fn` is guaranteed to run
|
|
// at least once per `maxInterval`, ensuring periodic execution even without
|
|
// explicit `Run()` calls (e.g., for refreshing state).
|
|
//
|
|
// `maxInterval` must be greater than or equal to `minInterval`; otherwise,
|
|
// this function will panic.
|
|
//
|
|
// If `fn` returns an error, then it will be run again no later than `retryInterval`
|
|
// (unless another trigger, like `Run()` or `maxInterval`, causes it to run sooner). Any
|
|
// successful run will abort the retry attempt.
|
|
func NewBoundedFrequencyRunner(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration) *BoundedFrequencyRunner {
|
|
return construct(name, fn, minInterval, retryInterval, maxInterval, clock.RealClock{})
|
|
}
|
|
|
|
// Make an instance with dependencies injected.
|
|
func construct(name string, fn func() error, minInterval, retryInterval, maxInterval time.Duration, clock clock.Clock) *BoundedFrequencyRunner {
|
|
if maxInterval < minInterval {
|
|
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
|
|
}
|
|
|
|
bfr := &BoundedFrequencyRunner{
|
|
name: name,
|
|
fn: fn,
|
|
|
|
minInterval: minInterval,
|
|
retryInterval: retryInterval,
|
|
maxInterval: maxInterval,
|
|
|
|
run: make(chan struct{}, 1),
|
|
clock: clock,
|
|
}
|
|
|
|
return bfr
|
|
}
|
|
|
|
// Loop handles the periodic timer and run requests. This is expected to be
|
|
// called as a goroutine.
|
|
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
|
|
klog.V(3).InfoS("Loop running", "runner", bfr.name)
|
|
defer close(bfr.run)
|
|
|
|
bfr.minIntervalTimer = bfr.clock.NewTimer(bfr.minInterval)
|
|
defer bfr.minIntervalTimer.Stop()
|
|
|
|
// Initialize nextRunTimer with maxInterval
|
|
bfr.nextRunTimer = bfr.clock.NewTimer(bfr.maxInterval)
|
|
defer bfr.nextRunTimer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
klog.V(3).InfoS("Loop stopping", "runner", bfr.name)
|
|
return
|
|
case <-bfr.nextRunTimer.C(): // Wait on the single timer
|
|
case <-bfr.run:
|
|
}
|
|
|
|
// stop the timers here to allow the tests using the fake clock to synchronize
|
|
// with the fakeClock.HasWaiters() method. The timers are reset after the function
|
|
// is executed.
|
|
bfr.minIntervalTimer.Stop()
|
|
bfr.nextRunTimer.Stop()
|
|
|
|
var err error
|
|
// avoid crashing if the function executed crashes
|
|
func() {
|
|
defer utilruntime.HandleCrash()
|
|
err = bfr.fn()
|
|
}()
|
|
|
|
// Determine the next interval based on the result
|
|
nextInterval := bfr.maxInterval
|
|
if err != nil {
|
|
// If error, ensure next run is within retryInterval and maxInterval
|
|
if bfr.retryInterval < nextInterval {
|
|
nextInterval = bfr.retryInterval
|
|
}
|
|
klog.V(3).InfoS("scheduling retry", "runner", bfr.name, "interval", nextInterval, "error", err)
|
|
}
|
|
// Reset the timers
|
|
bfr.minIntervalTimer.Reset(bfr.minInterval)
|
|
bfr.nextRunTimer.Reset(nextInterval)
|
|
|
|
// Wait for minInterval before looping
|
|
select {
|
|
case <-stop:
|
|
klog.V(3).InfoS("Loop stopping", "runner", bfr.name)
|
|
return
|
|
case <-bfr.minIntervalTimer.C():
|
|
}
|
|
}
|
|
}
|
|
|
|
// Run the work function as soon as possible. If this is called while Loop is not
|
|
// running, the call may be deferred indefinitely.
|
|
// Once there is a queued request to call the work function, further calls to
|
|
// Run() will have no effect until after it runs.
|
|
func (bfr *BoundedFrequencyRunner) Run() {
|
|
// If bfr.run is empty, push an element onto it. Otherwise, do nothing.
|
|
select {
|
|
case bfr.run <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|