From f87e4a19c88fa908eb176ee7925f211bafba9b45 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Fri, 20 Sep 2024 08:47:49 +0200 Subject: [PATCH] storage/cacher/cache_watcher: add RV to watchCacheInterval --- .../pkg/storage/cacher/watch_cache.go | 2 +- .../storage/cacher/watch_cache_interval.go | 24 ++++++++++++------- .../cacher/watch_cache_interval_test.go | 3 ++- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 04a2617d640..63599f9aab7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -746,7 +746,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string, indexerFunc := func(i int) *watchCacheEvent { return w.cache[i%w.capacity] } - ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker()) + ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, resourceVersion, w.RWMutex.RLocker()) return ci, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go index babd74e0c82..2522854d5c2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go @@ -91,6 +91,10 @@ type watchCacheInterval struct { // lock on each invocation of Next(). buffer *watchCacheIntervalBuffer + // resourceVersion is the resourceVersion from which + // the interval was constructed. + resourceVersion uint64 + // lock effectively protects access to the underlying source // of events through - indexer and indexValidator. // @@ -103,14 +107,15 @@ type attrFunc func(runtime.Object) (labels.Set, fields.Set, error) type indexerFunc func(int) *watchCacheEvent type indexValidator func(int) bool -func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval { +func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, resourceVersion uint64, locker sync.Locker) *watchCacheInterval { return &watchCacheInterval{ - startIndex: startIndex, - endIndex: endIndex, - indexer: indexer, - indexValidator: indexValidator, - buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)}, - lock: locker, + startIndex: startIndex, + endIndex: endIndex, + indexer: indexer, + indexValidator: indexValidator, + buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)}, + resourceVersion: resourceVersion, + lock: locker, } } @@ -172,8 +177,9 @@ func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAt ci := &watchCacheInterval{ startIndex: 0, // Simulate that we already have all the events we're looking for. - endIndex: 0, - buffer: buffer, + endIndex: 0, + buffer: buffer, + resourceVersion: resourceVersion, } return ci, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go index 65dbf033fdf..487a5ac1e8d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go @@ -41,7 +41,7 @@ func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval { } indexValidator := func(_ int) bool { return true } - return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker) + return newCacheInterval(startIndex, endIndex, indexer, indexValidator, 0, locker) } func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer { @@ -300,6 +300,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) { wc.endIndex, indexerFunc, wc.isIndexValidLocked, + wc.resourceVersion, &wc.RWMutex, )