mirror of
https://github.com/outbackdingo/kubernetes.git
synced 2026-01-27 18:19:28 +00:00
Remove burst syncs from BoundedFrequencyRunner
Burst syncs are theoretically useful for dealing with a single change that results in multiple Run() calls (eg, a Service and EndpointSlice both changing), but 2 isn't enough to cover all cases, and a better way of dealing with this problem is to just use a smaller minSyncPeriod. Co-authored-by: Antonio Ojea <aojea@google.com>
This commit is contained in:
@@ -307,12 +307,11 @@ func NewProxier(ctx context.Context,
|
||||
},
|
||||
}
|
||||
|
||||
burstSyncs := 2
|
||||
logger.V(2).Info("Iptables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
|
||||
logger.V(2).Info("Iptables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod)
|
||||
// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
|
||||
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
|
||||
// time.Hour is arbitrary.
|
||||
proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod, burstSyncs)
|
||||
proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod)
|
||||
|
||||
go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
|
||||
proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)
|
||||
|
||||
@@ -144,7 +144,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
||||
},
|
||||
}
|
||||
p.setInitialized(true)
|
||||
p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
||||
p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute)
|
||||
return p
|
||||
}
|
||||
|
||||
|
||||
@@ -400,9 +400,8 @@ func NewProxier(
|
||||
for _, is := range ipsetInfo {
|
||||
proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment)
|
||||
}
|
||||
burstSyncs := 2
|
||||
logger.V(2).Info("ipvs sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
|
||||
proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
|
||||
logger.V(2).Info("ipvs sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod)
|
||||
proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod)
|
||||
proxier.gracefuldeleteManager.Run()
|
||||
return proxier, nil
|
||||
}
|
||||
|
||||
@@ -167,7 +167,7 @@ func NewFakeProxier(ctx context.Context, ipt utiliptables.Interface, ipvs utilip
|
||||
ipFamily: ipFamily,
|
||||
}
|
||||
p.setInitialized(true)
|
||||
p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
||||
p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute)
|
||||
return p
|
||||
}
|
||||
|
||||
|
||||
@@ -273,10 +273,9 @@ func NewProxier(ctx context.Context,
|
||||
serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap),
|
||||
}
|
||||
|
||||
burstSyncs := 2
|
||||
logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
|
||||
logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod)
|
||||
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner. time.Hour is arbitrary.
|
||||
proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod, burstSyncs)
|
||||
proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod)
|
||||
|
||||
return proxier, nil
|
||||
}
|
||||
|
||||
@@ -142,7 +142,7 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
|
||||
serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap),
|
||||
}
|
||||
p.setInitialized(true)
|
||||
p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
||||
p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute)
|
||||
|
||||
return nft, p
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
// BoundedFrequencyRunner manages runs of a user-provided work function.
|
||||
type BoundedFrequencyRunner struct {
|
||||
name string // the name of this instance
|
||||
minInterval time.Duration // the min time between runs, modulo bursts
|
||||
minInterval time.Duration // the min time between runs
|
||||
maxInterval time.Duration // the max time between runs
|
||||
|
||||
run chan struct{} // try an async run
|
||||
@@ -140,14 +140,14 @@ var _ timer = &realTimer{}
|
||||
//
|
||||
// `maxInterval` must be greater than or equal to `minInterval`; otherwise,
|
||||
// this function will panic.
|
||||
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
|
||||
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration) *BoundedFrequencyRunner {
|
||||
timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
|
||||
<-timer.C() // consume the first tick
|
||||
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
|
||||
return construct(name, fn, minInterval, maxInterval, timer)
|
||||
}
|
||||
|
||||
// Make an instance with dependencies injected.
|
||||
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
|
||||
func construct(name string, fn func(), minInterval, maxInterval time.Duration, timer timer) *BoundedFrequencyRunner {
|
||||
if maxInterval < minInterval {
|
||||
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
|
||||
}
|
||||
@@ -164,9 +164,8 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b
|
||||
if minInterval == 0 {
|
||||
bfr.limiter = nullLimiter{}
|
||||
} else {
|
||||
// allow burst updates in short succession
|
||||
qps := float32(time.Second) / float32(minInterval)
|
||||
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
|
||||
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, 1, timer)
|
||||
}
|
||||
return bfr
|
||||
}
|
||||
|
||||
@@ -215,10 +215,10 @@ func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver)
|
||||
checkReceiver(name, t, obj, false)
|
||||
}
|
||||
|
||||
func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
|
||||
func Test_BoundedFrequencyRunner(t *testing.T) {
|
||||
obj := &receiver{}
|
||||
timer := newFakeTimer()
|
||||
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
|
||||
runner := construct("test-runner", obj.F, minInterval, maxInterval, timer)
|
||||
stop := make(chan struct{})
|
||||
|
||||
var upd timerUpdate
|
||||
@@ -289,94 +289,10 @@ func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
|
||||
<-timer.updated
|
||||
}
|
||||
|
||||
func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
|
||||
obj := &receiver{}
|
||||
timer := newFakeTimer()
|
||||
runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
|
||||
stop := make(chan struct{})
|
||||
|
||||
var upd timerUpdate
|
||||
|
||||
// Start.
|
||||
go runner.Loop(stop)
|
||||
upd = <-timer.updated // wait for initial time to be set to max
|
||||
checkTimer("init", t, upd, true, maxInterval)
|
||||
checkReceiver("init", t, obj, false)
|
||||
|
||||
// Run once, immediately.
|
||||
// abs=0ms, rel=0ms
|
||||
runner.Run()
|
||||
waitForRun("first run", t, timer, obj)
|
||||
|
||||
// Run again, before minInterval expires, with burst.
|
||||
timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
|
||||
runner.Run()
|
||||
waitForRun("second run", t, timer, obj)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
|
||||
runner.Run()
|
||||
waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
|
||||
runner.Run()
|
||||
waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
|
||||
runner.Run()
|
||||
waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
|
||||
|
||||
// Advance timer enough to replenish bursts, but not enough to be minInterval
|
||||
// after the last run
|
||||
timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
|
||||
waitForNothing("not minInterval", t, timer, obj)
|
||||
runner.Run()
|
||||
waitForRun("third run", t, timer, obj)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
|
||||
runner.Run()
|
||||
waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
|
||||
|
||||
// Run again, before minInterval expires.
|
||||
timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
|
||||
runner.Run()
|
||||
waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
|
||||
|
||||
// Advance and do the deferred run
|
||||
timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
|
||||
waitForRun("fourth run", t, timer, obj)
|
||||
|
||||
// Run again, once burst has fully replenished.
|
||||
timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
|
||||
runner.Run()
|
||||
waitForRun("fifth run", t, timer, obj)
|
||||
runner.Run()
|
||||
waitForRun("sixth run", t, timer, obj)
|
||||
runner.Run()
|
||||
waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
|
||||
|
||||
// Wait until minInterval after the last run
|
||||
timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
|
||||
waitForRun("seventh run", t, timer, obj)
|
||||
|
||||
// Wait for maxInterval
|
||||
timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
|
||||
waitForRun("maxInterval", t, timer, obj)
|
||||
|
||||
// Clean up.
|
||||
stop <- struct{}{}
|
||||
// a message is sent to time.updated in func Stop() at the end of the child goroutine
|
||||
// to terminate the child, a receive on time.updated is needed here
|
||||
<-timer.updated
|
||||
}
|
||||
|
||||
func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
|
||||
obj := &receiver{}
|
||||
timer := newFakeTimer()
|
||||
runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
|
||||
runner := construct("test-runner", obj.F, minInterval, maxInterval, timer)
|
||||
stop := make(chan struct{})
|
||||
|
||||
var upd timerUpdate
|
||||
|
||||
@@ -753,9 +753,8 @@ func NewProxier(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
burstSyncs := 2
|
||||
klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
|
||||
proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
|
||||
klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod)
|
||||
proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod)
|
||||
return proxier, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user