mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Move watch progress to separate package.
This commit is contained in:
		| @@ -43,6 +43,7 @@ import ( | |||||||
| 	"k8s.io/apiserver/pkg/features" | 	"k8s.io/apiserver/pkg/features" | ||||||
| 	"k8s.io/apiserver/pkg/storage" | 	"k8s.io/apiserver/pkg/storage" | ||||||
| 	"k8s.io/apiserver/pkg/storage/cacher/metrics" | 	"k8s.io/apiserver/pkg/storage/cacher/metrics" | ||||||
|  | 	"k8s.io/apiserver/pkg/storage/cacher/progress" | ||||||
| 	etcdfeature "k8s.io/apiserver/pkg/storage/feature" | 	etcdfeature "k8s.io/apiserver/pkg/storage/feature" | ||||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||||
| 	"k8s.io/client-go/tools/cache" | 	"k8s.io/client-go/tools/cache" | ||||||
| @@ -420,7 +421,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { | |||||||
| 		return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration) | 		return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata) | 	progressRequester := progress.NewConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata) | ||||||
| 	watchCache := newWatchCache( | 	watchCache := newWatchCache( | ||||||
| 		config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, | 		config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, | ||||||
| 		config.Clock, eventFreshDuration, config.GroupResource, progressRequester) | 		config.Clock, eventFreshDuration, config.GroupResource, progressRequester) | ||||||
|   | |||||||
| @@ -2815,13 +2815,6 @@ func TestWatchStreamSeparation(t *testing.T) { | |||||||
| 			expectBookmarkOnEtcd:       true, | 			expectBookmarkOnEtcd:       true, | ||||||
| 			expectBookmarkOnWatchCache: true, | 			expectBookmarkOnWatchCache: true, | ||||||
| 		}, | 		}, | ||||||
| 		{ |  | ||||||
| 			name:                         "common RPC & watch cache context > both get bookmarks", |  | ||||||
| 			separateCacheWatchRPC:        false, |  | ||||||
| 			useWatchCacheContextMetadata: true, |  | ||||||
| 			expectBookmarkOnEtcd:         true, |  | ||||||
| 			expectBookmarkOnWatchCache:   true, |  | ||||||
| 		}, |  | ||||||
| 		{ | 		{ | ||||||
| 			name:                       "separate RPC > only etcd gets bookmarks", | 			name:                       "separate RPC > only etcd gets bookmarks", | ||||||
| 			separateCacheWatchRPC:      true, | 			separateCacheWatchRPC:      true, | ||||||
| @@ -2877,7 +2870,7 @@ func TestWatchStreamSeparation(t *testing.T) { | |||||||
|  |  | ||||||
| 			var contextMetadata metadata.MD | 			var contextMetadata metadata.MD | ||||||
| 			if tc.useWatchCacheContextMetadata { | 			if tc.useWatchCacheContextMetadata { | ||||||
| 				contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata | 				contextMetadata = metadata.New(map[string]string{"source": "cache"}) | ||||||
| 			} | 			} | ||||||
| 			// For the first 100ms from watch creation, watch progress requests are ignored. | 			// For the first 100ms from watch creation, watch progress requests are ignored. | ||||||
| 			time.Sleep(200 * time.Millisecond) | 			time.Sleep(200 * time.Millisecond) | ||||||
|   | |||||||
| @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and | |||||||
| limitations under the License. | limitations under the License. | ||||||
| */ | */ | ||||||
| 
 | 
 | ||||||
| package cacher | package progress | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| @@ -36,8 +36,8 @@ const ( | |||||||
| 	progressRequestPeriod = 100 * time.Millisecond | 	progressRequestPeriod = 100 * time.Millisecond | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester { | func NewConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *ConditionalProgressRequester { | ||||||
| 	pr := &conditionalProgressRequester{ | 	pr := &ConditionalProgressRequester{ | ||||||
| 		clock:                clock, | 		clock:                clock, | ||||||
| 		requestWatchProgress: requestWatchProgress, | 		requestWatchProgress: requestWatchProgress, | ||||||
| 		contextMetadata:      contextMetadata, | 		contextMetadata:      contextMetadata, | ||||||
| @@ -52,9 +52,9 @@ type TickerFactory interface { | |||||||
| 	NewTimer(time.Duration) clock.Timer | 	NewTimer(time.Duration) clock.Timer | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // conditionalProgressRequester will request progress notification if there | // ConditionalProgressRequester will request progress notification if there | ||||||
| // is a request waiting for watch cache to be fresh. | // is a request waiting for watch cache to be fresh. | ||||||
| type conditionalProgressRequester struct { | type ConditionalProgressRequester struct { | ||||||
| 	clock                TickerFactory | 	clock                TickerFactory | ||||||
| 	requestWatchProgress WatchProgressRequester | 	requestWatchProgress WatchProgressRequester | ||||||
| 	contextMetadata      metadata.MD | 	contextMetadata      metadata.MD | ||||||
| @@ -65,7 +65,7 @@ type conditionalProgressRequester struct { | |||||||
| 	stopped bool | 	stopped bool | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { | func (pr *ConditionalProgressRequester) Run(stopCh <-chan struct{}) { | ||||||
| 	ctx := wait.ContextForChannel(stopCh) | 	ctx := wait.ContextForChannel(stopCh) | ||||||
| 	if pr.contextMetadata != nil { | 	if pr.contextMetadata != nil { | ||||||
| 		ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata) | 		ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata) | ||||||
| @@ -115,14 +115,14 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (pr *conditionalProgressRequester) Add() { | func (pr *ConditionalProgressRequester) Add() { | ||||||
| 	pr.mux.Lock() | 	pr.mux.Lock() | ||||||
| 	defer pr.mux.Unlock() | 	defer pr.mux.Unlock() | ||||||
| 	pr.waiting += 1 | 	pr.waiting += 1 | ||||||
| 	pr.cond.Signal() | 	pr.cond.Signal() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (pr *conditionalProgressRequester) Remove() { | func (pr *ConditionalProgressRequester) Remove() { | ||||||
| 	pr.mux.Lock() | 	pr.mux.Lock() | ||||||
| 	defer pr.mux.Unlock() | 	defer pr.mux.Unlock() | ||||||
| 	pr.waiting -= 1 | 	pr.waiting -= 1 | ||||||
| @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and | |||||||
| limitations under the License. | limitations under the License. | ||||||
| */ | */ | ||||||
| 
 | 
 | ||||||
| package cacher | package progress | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| @@ -115,12 +115,12 @@ func TestConditionalProgressRequester(t *testing.T) { | |||||||
| 
 | 
 | ||||||
| func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester { | func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester { | ||||||
| 	pr := &testConditionalProgressRequester{} | 	pr := &testConditionalProgressRequester{} | ||||||
| 	pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock, nil) | 	pr.ConditionalProgressRequester = NewConditionalProgressRequester(pr.RequestWatchProgress, clock, nil) | ||||||
| 	return pr | 	return pr | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type testConditionalProgressRequester struct { | type testConditionalProgressRequester struct { | ||||||
| 	*conditionalProgressRequester | 	*ConditionalProgressRequester | ||||||
| 	progressRequestsSentCount atomic.Int32 | 	progressRequestsSentCount atomic.Int32 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @@ -33,6 +33,7 @@ import ( | |||||||
| 	"k8s.io/apiserver/pkg/features" | 	"k8s.io/apiserver/pkg/features" | ||||||
| 	"k8s.io/apiserver/pkg/storage" | 	"k8s.io/apiserver/pkg/storage" | ||||||
| 	"k8s.io/apiserver/pkg/storage/cacher/metrics" | 	"k8s.io/apiserver/pkg/storage/cacher/metrics" | ||||||
|  | 	"k8s.io/apiserver/pkg/storage/cacher/progress" | ||||||
| 	etcdfeature "k8s.io/apiserver/pkg/storage/feature" | 	etcdfeature "k8s.io/apiserver/pkg/storage/feature" | ||||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||||
| 	"k8s.io/client-go/tools/cache" | 	"k8s.io/client-go/tools/cache" | ||||||
| @@ -150,7 +151,7 @@ type watchCache struct { | |||||||
|  |  | ||||||
| 	// Requests progress notification if there are requests waiting for watch | 	// Requests progress notification if there are requests waiting for watch | ||||||
| 	// to be fresh | 	// to be fresh | ||||||
| 	waitingUntilFresh *conditionalProgressRequester | 	waitingUntilFresh *progress.ConditionalProgressRequester | ||||||
|  |  | ||||||
| 	// Stores previous snapshots of orderedLister to allow serving requests from previous revisions. | 	// Stores previous snapshots of orderedLister to allow serving requests from previous revisions. | ||||||
| 	snapshots *storeSnapshotter | 	snapshots *storeSnapshotter | ||||||
| @@ -165,7 +166,7 @@ func newWatchCache( | |||||||
| 	clock clock.WithTicker, | 	clock clock.WithTicker, | ||||||
| 	eventFreshDuration time.Duration, | 	eventFreshDuration time.Duration, | ||||||
| 	groupResource schema.GroupResource, | 	groupResource schema.GroupResource, | ||||||
| 	progressRequester *conditionalProgressRequester) *watchCache { | 	progressRequester *progress.ConditionalProgressRequester) *watchCache { | ||||||
| 	wc := &watchCache{ | 	wc := &watchCache{ | ||||||
| 		capacity:            defaultLowerBoundCapacity, | 		capacity:            defaultLowerBoundCapacity, | ||||||
| 		keyFunc:             keyFunc, | 		keyFunc:             keyFunc, | ||||||
|   | |||||||
| @@ -40,6 +40,7 @@ import ( | |||||||
| 	"k8s.io/apiserver/pkg/features" | 	"k8s.io/apiserver/pkg/features" | ||||||
| 	"k8s.io/apiserver/pkg/storage" | 	"k8s.io/apiserver/pkg/storage" | ||||||
| 	"k8s.io/apiserver/pkg/storage/cacher/metrics" | 	"k8s.io/apiserver/pkg/storage/cacher/metrics" | ||||||
|  | 	"k8s.io/apiserver/pkg/storage/cacher/progress" | ||||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||||
| 	"k8s.io/client-go/tools/cache" | 	"k8s.io/client-go/tools/cache" | ||||||
| 	featuregatetesting "k8s.io/component-base/featuregate/testing" | 	featuregatetesting "k8s.io/component-base/featuregate/testing" | ||||||
| @@ -128,7 +129,7 @@ func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers | |||||||
| 	wc := &testWatchCache{} | 	wc := &testWatchCache{} | ||||||
| 	wc.bookmarkRevision = make(chan int64, 1) | 	wc.bookmarkRevision = make(chan int64, 1) | ||||||
| 	wc.stopCh = make(chan struct{}) | 	wc.stopCh = make(chan struct{}) | ||||||
| 	pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) | 	pr := progress.NewConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) | ||||||
| 	go pr.Run(wc.stopCh) | 	go pr.Run(wc.stopCh) | ||||||
| 	wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr) | 	wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr) | ||||||
| 	// To preserve behavior of tests that assume a given capacity, | 	// To preserve behavior of tests that assume a given capacity, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Marek Siarkowicz
					Marek Siarkowicz