storage/cacher/cache_watcher: add RV to watchCacheInterval

This commit is contained in:
Lukasz Szaszkiewicz
2024-09-20 08:47:49 +02:00
parent 767d28dcf0
commit f87e4a19c8
3 changed files with 18 additions and 11 deletions

View File

@@ -746,7 +746,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string,
indexerFunc := func(i int) *watchCacheEvent { indexerFunc := func(i int) *watchCacheEvent {
return w.cache[i%w.capacity] return w.cache[i%w.capacity]
} }
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker()) ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, resourceVersion, w.RWMutex.RLocker())
return ci, nil return ci, nil
} }

View File

@@ -91,6 +91,10 @@ type watchCacheInterval struct {
// lock on each invocation of Next(). // lock on each invocation of Next().
buffer *watchCacheIntervalBuffer buffer *watchCacheIntervalBuffer
// resourceVersion is the resourceVersion from which
// the interval was constructed.
resourceVersion uint64
// lock effectively protects access to the underlying source // lock effectively protects access to the underlying source
// of events through - indexer and indexValidator. // of events through - indexer and indexValidator.
// //
@@ -103,14 +107,15 @@ type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
type indexerFunc func(int) *watchCacheEvent type indexerFunc func(int) *watchCacheEvent
type indexValidator func(int) bool type indexValidator func(int) bool
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval { func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, resourceVersion uint64, locker sync.Locker) *watchCacheInterval {
return &watchCacheInterval{ return &watchCacheInterval{
startIndex: startIndex, startIndex: startIndex,
endIndex: endIndex, endIndex: endIndex,
indexer: indexer, indexer: indexer,
indexValidator: indexValidator, indexValidator: indexValidator,
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)}, buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
lock: locker, resourceVersion: resourceVersion,
lock: locker,
} }
} }
@@ -172,8 +177,9 @@ func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAt
ci := &watchCacheInterval{ ci := &watchCacheInterval{
startIndex: 0, startIndex: 0,
// Simulate that we already have all the events we're looking for. // Simulate that we already have all the events we're looking for.
endIndex: 0, endIndex: 0,
buffer: buffer, buffer: buffer,
resourceVersion: resourceVersion,
} }
return ci, nil return ci, nil

View File

@@ -41,7 +41,7 @@ func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval {
} }
indexValidator := func(_ int) bool { return true } indexValidator := func(_ int) bool { return true }
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker) return newCacheInterval(startIndex, endIndex, indexer, indexValidator, 0, locker)
} }
func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer { func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer {
@@ -300,6 +300,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
wc.endIndex, wc.endIndex,
indexerFunc, indexerFunc,
wc.isIndexValidLocked, wc.isIndexValidLocked,
wc.resourceVersion,
&wc.RWMutex, &wc.RWMutex,
) )