From c16ee887efc30e956e23331cbdc8e48b4530ca54 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 5 May 2025 14:26:28 -0400 Subject: [PATCH] Remove burst syncs from BoundedFrequencyRunner Burst syncs are theoretically useful for dealing with a single change that results in multiple Run() calls (eg, a Service and EndpointSlice both changing), but 2 isn't enough to cover all cases, and a better way of dealing with this problem is to just use a smaller minSyncPeriod. Co-authored-by: Antonio Ojea --- pkg/proxy/iptables/proxier.go | 5 +- pkg/proxy/iptables/proxier_test.go | 2 +- pkg/proxy/ipvs/proxier.go | 5 +- pkg/proxy/ipvs/proxier_test.go | 2 +- pkg/proxy/nftables/proxier.go | 5 +- pkg/proxy/nftables/proxier_test.go | 2 +- pkg/proxy/runner/bounded_frequency_runner.go | 11 ++- .../runner/bounded_frequency_runner_test.go | 90 +------------------ pkg/proxy/winkernel/proxier.go | 5 +- 9 files changed, 19 insertions(+), 108 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 600ab9b62bf..aa663139bd7 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -307,12 +307,11 @@ func NewProxier(ctx context.Context, }, } - burstSyncs := 2 - logger.V(2).Info("Iptables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) + logger.V(2).Info("Iptables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod) // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to. // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though. // time.Hour is arbitrary. - proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod, burstSyncs) + proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod) go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 430d7c34a77..5d1f41423d6 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -144,7 +144,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { }, } p.setInitialized(true) - p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) + p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute) return p } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 7995730d320..eb342082a7f 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -400,9 +400,8 @@ func NewProxier( for _, is := range ipsetInfo { proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment) } - burstSyncs := 2 - logger.V(2).Info("ipvs sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) - proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + logger.V(2).Info("ipvs sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod) + proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod) proxier.gracefuldeleteManager.Run() return proxier, nil } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 5befa4d2476..df5f6c3620e 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -167,7 +167,7 @@ func NewFakeProxier(ctx context.Context, ipt utiliptables.Interface, ipvs utilip ipFamily: ipFamily, } p.setInitialized(true) - p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) + p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute) return p } diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 6e2ce149fb3..7947405672b 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -273,10 +273,9 @@ func NewProxier(ctx context.Context, serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap), } - burstSyncs := 2 - logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) + logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod) // We need to pass *some* maxInterval to NewBoundedFrequencyRunner. time.Hour is arbitrary. - proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod, burstSyncs) + proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, proxyutil.FullSyncPeriod) return proxier, nil } diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index cc19185639e..ecd0fd9b45c 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -142,7 +142,7 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { serviceNodePorts: newNFTElementStorage("map", serviceNodePortsMap), } p.setInitialized(true) - p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) + p.syncRunner = runner.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute) return nft, p } diff --git a/pkg/proxy/runner/bounded_frequency_runner.go b/pkg/proxy/runner/bounded_frequency_runner.go index 3ad3255e4b0..6bfbb3e5f7b 100644 --- a/pkg/proxy/runner/bounded_frequency_runner.go +++ b/pkg/proxy/runner/bounded_frequency_runner.go @@ -29,7 +29,7 @@ import ( // BoundedFrequencyRunner manages runs of a user-provided work function. type BoundedFrequencyRunner struct { name string // the name of this instance - minInterval time.Duration // the min time between runs, modulo bursts + minInterval time.Duration // the min time between runs maxInterval time.Duration // the max time between runs run chan struct{} // try an async run @@ -140,14 +140,14 @@ var _ timer = &realTimer{} // // `maxInterval` must be greater than or equal to `minInterval`; otherwise, // this function will panic. -func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { +func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration) *BoundedFrequencyRunner { timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately <-timer.C() // consume the first tick - return construct(name, fn, minInterval, maxInterval, burstRuns, timer) + return construct(name, fn, minInterval, maxInterval, timer) } // Make an instance with dependencies injected. -func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner { +func construct(name string, fn func(), minInterval, maxInterval time.Duration, timer timer) *BoundedFrequencyRunner { if maxInterval < minInterval { panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval)) } @@ -164,9 +164,8 @@ func construct(name string, fn func(), minInterval, maxInterval time.Duration, b if minInterval == 0 { bfr.limiter = nullLimiter{} } else { - // allow burst updates in short succession qps := float32(time.Second) / float32(minInterval) - bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer) + bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, 1, timer) } return bfr } diff --git a/pkg/proxy/runner/bounded_frequency_runner_test.go b/pkg/proxy/runner/bounded_frequency_runner_test.go index 3d912f15249..12f9ba08303 100644 --- a/pkg/proxy/runner/bounded_frequency_runner_test.go +++ b/pkg/proxy/runner/bounded_frequency_runner_test.go @@ -215,10 +215,10 @@ func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) checkReceiver(name, t, obj, false) } -func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { +func Test_BoundedFrequencyRunner(t *testing.T) { obj := &receiver{} timer := newFakeTimer() - runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) + runner := construct("test-runner", obj.F, minInterval, maxInterval, timer) stop := make(chan struct{}) var upd timerUpdate @@ -289,94 +289,10 @@ func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { <-timer.updated } -func Test_BoundedFrequencyRunnerBurst(t *testing.T) { - obj := &receiver{} - timer := newFakeTimer() - runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer) - stop := make(chan struct{}) - - var upd timerUpdate - - // Start. - go runner.Loop(stop) - upd = <-timer.updated // wait for initial time to be set to max - checkTimer("init", t, upd, true, maxInterval) - checkReceiver("init", t, obj, false) - - // Run once, immediately. - // abs=0ms, rel=0ms - runner.Run() - waitForRun("first run", t, timer, obj) - - // Run again, before minInterval expires, with burst. - timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms - runner.Run() - waitForRun("second run", t, timer, obj) - - // Run again, before minInterval expires. - timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms - runner.Run() - waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond) - - // Run again, before minInterval expires. - timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms - runner.Run() - waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond) - - // Run again, before minInterval expires. - timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms - runner.Run() - waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond) - - // Advance timer enough to replenish bursts, but not enough to be minInterval - // after the last run - timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms - waitForNothing("not minInterval", t, timer, obj) - runner.Run() - waitForRun("third run", t, timer, obj) - - // Run again, before minInterval expires. - timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms - runner.Run() - waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond) - - // Run again, before minInterval expires. - timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms - runner.Run() - waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond) - - // Advance and do the deferred run - timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms - waitForRun("fourth run", t, timer, obj) - - // Run again, once burst has fully replenished. - timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms - runner.Run() - waitForRun("fifth run", t, timer, obj) - runner.Run() - waitForRun("sixth run", t, timer, obj) - runner.Run() - waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second) - - // Wait until minInterval after the last run - timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms - waitForRun("seventh run", t, timer, obj) - - // Wait for maxInterval - timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms - waitForRun("maxInterval", t, timer, obj) - - // Clean up. - stop <- struct{}{} - // a message is sent to time.updated in func Stop() at the end of the child goroutine - // to terminate the child, a receive on time.updated is needed here - <-timer.updated -} - func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) { obj := &receiver{} timer := newFakeTimer() - runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) + runner := construct("test-runner", obj.F, minInterval, maxInterval, timer) stop := make(chan struct{}) var upd timerUpdate diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 14880aed708..bff18030b32 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -753,9 +753,8 @@ func NewProxier( return nil, err } - burstSyncs := 2 - klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) - proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod) + proxier.syncRunner = runner.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod) return proxier, nil }