mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 12:18:16 +00:00 
			
		
		
		
	Serve watch without resourceVersion from cache and introduce a WatchFromStorageWithoutResourceVersion feature gate to allow serving watch from storage.
This commit is contained in:
		@@ -1307,6 +1307,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	genericfeatures.WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
 | 
						genericfeatures.WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						genericfeatures.WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	genericfeatures.WatchList: {Default: false, PreRelease: featuregate.Alpha},
 | 
						genericfeatures.WatchList: {Default: false, PreRelease: featuregate.Alpha},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	genericfeatures.ZeroLimitedNominalConcurrencyShares: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
 | 
						genericfeatures.ZeroLimitedNominalConcurrencyShares: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -258,6 +258,12 @@ const (
 | 
				
			|||||||
	// Enables support for watch bookmark events.
 | 
						// Enables support for watch bookmark events.
 | 
				
			||||||
	WatchBookmark featuregate.Feature = "WatchBookmark"
 | 
						WatchBookmark featuregate.Feature = "WatchBookmark"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// owner: @serathius
 | 
				
			||||||
 | 
						// beta: 1.30
 | 
				
			||||||
 | 
						// Enables watches without resourceVersion to be served from storage.
 | 
				
			||||||
 | 
						// Used to prevent https://github.com/kubernetes/kubernetes/issues/123072 until etcd fixes the issue.
 | 
				
			||||||
 | 
						WatchFromStorageWithoutResourceVersion featuregate.Feature = "WatchFromStorageWithoutResourceVersion"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// owner: @vinaykul
 | 
						// owner: @vinaykul
 | 
				
			||||||
	// kep: http://kep.k8s.io/1287
 | 
						// kep: http://kep.k8s.io/1287
 | 
				
			||||||
	// alpha: v1.27
 | 
						// alpha: v1.27
 | 
				
			||||||
@@ -349,6 +355,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
 | 
						WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},
 | 
						InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	WatchList: {Default: false, PreRelease: featuregate.Alpha},
 | 
						WatchList: {Default: false, PreRelease: featuregate.Alpha},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -523,7 +523,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
		opts.SendInitialEvents = nil
 | 
							opts.SendInitialEvents = nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// TODO: we should eventually get rid of this legacy case
 | 
						// TODO: we should eventually get rid of this legacy case
 | 
				
			||||||
	if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
 | 
				
			||||||
		return c.storage.Watch(ctx, key, opts)
 | 
							return c.storage.Watch(ctx, key, opts)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
 | 
						requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
 | 
				
			||||||
@@ -1282,12 +1282,14 @@ func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion
 | 
				
			|||||||
//
 | 
					//
 | 
				
			||||||
//	if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan
 | 
					//	if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan
 | 
				
			||||||
//	if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil
 | 
					//	if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil
 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// to satisfy the legacy case (SendInitialEvents = true, RV="") we skip checking opts.Predicate.AllowWatchBookmarks
 | 
					 | 
				
			||||||
func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) {
 | 
					func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) {
 | 
				
			||||||
	if len(opts.ResourceVersion) != 0 {
 | 
						if len(opts.ResourceVersion) != 0 {
 | 
				
			||||||
		return parsedWatchResourceVersion, nil
 | 
							return parsedWatchResourceVersion, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						// legacy case
 | 
				
			||||||
 | 
						if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
 | 
				
			||||||
 | 
							return 0, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
 | 
						rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
 | 
				
			||||||
	return rv, err
 | 
						return rv, err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -381,9 +381,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestWatchSemantics(t *testing.T) {
 | 
					func TestWatchSemantics(t *testing.T) {
 | 
				
			||||||
 | 
						t.Run("WatchFromStorageWithoutResourceVersion=true", func(t *testing.T) {
 | 
				
			||||||
 | 
							defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
 | 
				
			||||||
		store, terminate := testSetupWithEtcdAndCreateWrapper(t)
 | 
							store, terminate := testSetupWithEtcdAndCreateWrapper(t)
 | 
				
			||||||
		t.Cleanup(terminate)
 | 
							t.Cleanup(terminate)
 | 
				
			||||||
		storagetesting.RunWatchSemantics(context.TODO(), t, store)
 | 
							storagetesting.RunWatchSemantics(context.TODO(), t, store)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						t.Run("WatchFromStorageWithoutResourceVersion=false", func(t *testing.T) {
 | 
				
			||||||
 | 
							defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
 | 
				
			||||||
 | 
							store, terminate := testSetupWithEtcdAndCreateWrapper(t)
 | 
				
			||||||
 | 
							t.Cleanup(terminate)
 | 
				
			||||||
 | 
							storagetesting.RunWatchSemantics(context.TODO(), t, store)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestWatchSemanticInitialEventsExtended(t *testing.T) {
 | 
					func TestWatchSemanticInitialEventsExtended(t *testing.T) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -338,8 +338,6 @@ func TestWatchCacheBypass(t *testing.T) {
 | 
				
			|||||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
							t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Inject error to underlying layer and check if cacher is not bypassed.
 | 
					 | 
				
			||||||
	backingStorage.injectError(errDummy)
 | 
					 | 
				
			||||||
	_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
 | 
						_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
 | 
				
			||||||
		ResourceVersion: "0",
 | 
							ResourceVersion: "0",
 | 
				
			||||||
		Predicate:       storage.Everything,
 | 
							Predicate:       storage.Everything,
 | 
				
			||||||
@@ -348,12 +346,32 @@ func TestWatchCacheBypass(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("Watch with RV=0 should be served from cache: %v", err)
 | 
							t.Errorf("Watch with RV=0 should be served from cache: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// With unset RV, check if cacher is bypassed.
 | 
					 | 
				
			||||||
	_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
 | 
						_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
 | 
				
			||||||
		ResourceVersion: "",
 | 
							ResourceVersion: "",
 | 
				
			||||||
 | 
							Predicate:       storage.Everything,
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	if err != errDummy {
 | 
						if err != nil {
 | 
				
			||||||
		t.Errorf("Watch with unset RV should bypass cacher: %v", err)
 | 
							t.Errorf("Watch with RV=0 should be served from cache: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
 | 
				
			||||||
 | 
						_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
 | 
				
			||||||
 | 
							ResourceVersion: "",
 | 
				
			||||||
 | 
							Predicate:       storage.Everything,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("With WatchFromStorageWithoutResourceVersion disabled, watch with unset RV should be served from cache: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Inject error to underlying layer and check if cacher is not bypassed.
 | 
				
			||||||
 | 
						backingStorage.injectError(errDummy)
 | 
				
			||||||
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
 | 
				
			||||||
 | 
						_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
 | 
				
			||||||
 | 
							ResourceVersion: "",
 | 
				
			||||||
 | 
							Predicate:       storage.Everything,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if !errors.Is(err, errDummy) {
 | 
				
			||||||
 | 
							t.Errorf("With WatchFromStorageWithoutResourceVersion enabled, watch with unset RV should be served from storage: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -2034,7 +2052,9 @@ func TestGetWatchCacheResourceVersion(t *testing.T) {
 | 
				
			|||||||
		{
 | 
							{
 | 
				
			||||||
			name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
 | 
								name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
 | 
				
			||||||
			opts: listOptions(true, nil, ""),
 | 
								opts: listOptions(true, nil, ""),
 | 
				
			||||||
			expectedWatchResourceVersion: 100,
 | 
								// Expecting RV 0, due to https://github.com/kubernetes/kubernetes/pull/123935 reverted to serving those requests from watch cache.
 | 
				
			||||||
 | 
								// Set to 100, when WatchFromStorageWithoutResourceVersion is set to true.
 | 
				
			||||||
 | 
								expectedWatchResourceVersion: 0,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:                         "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true",
 | 
								name:                         "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true",
 | 
				
			||||||
@@ -2049,7 +2069,9 @@ func TestGetWatchCacheResourceVersion(t *testing.T) {
 | 
				
			|||||||
		{
 | 
							{
 | 
				
			||||||
			name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
 | 
								name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
 | 
				
			||||||
			opts: listOptions(false, nil, ""),
 | 
								opts: listOptions(false, nil, ""),
 | 
				
			||||||
			expectedWatchResourceVersion: 100,
 | 
								// Expecting RV 0, due to https://github.com/kubernetes/kubernetes/pull/123935 reverted to serving those requests from watch cache.
 | 
				
			||||||
 | 
								// Set to 100, when WatchFromStorageWithoutResourceVersion is set to true.
 | 
				
			||||||
 | 
								expectedWatchResourceVersion: 0,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			name:                         "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true, legacy",
 | 
								name:                         "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true, legacy",
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user