diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index b49e4e0cd1e..88fcd656cd2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -106,9 +106,9 @@ func TestWatchInitializationSignal(t *testing.T) { func TestProgressNotify(t *testing.T) { clusterConfig := testserver.NewTestConfig(t) clusterConfig.WatchProgressNotifyInterval = time.Second - ctx, store, _ := testSetup(t, withClientConfig(clusterConfig)) + ctx, store, client := testSetup(t, withClientConfig(clusterConfig)) - storagetesting.RunOptionalTestProgressNotify(ctx, t, store) + storagetesting.RunOptionalTestProgressNotify(ctx, t, store, increaseRV(client.Client)) } func TestWatchWithUnsafeDelete(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index e1004d60ac2..567e2475571 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -585,7 +585,7 @@ func RunTestWatchInitializationSignal(ctx context.Context, t *testing.T, store s // Given this feature is currently not explicitly used by higher layers of Kubernetes // (it rather is used by wrappers of storage.Interface to implement its functionalities) // this test is currently considered optional. -func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store storage.Interface) { +func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc) { input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "test-ns"}} key := computePodKey(input) out := &example.Pod{} @@ -593,6 +593,15 @@ func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store stor t.Fatalf("Create failed: %v", err) } validateResourceVersion := resourceVersionNotOlderThan(out.ResourceVersion) + // Since etcd v3.6.2 we need to increase RV due to https://github.com/etcd-io/etcd/pull/20241. + // We must advance the resource version to ensure that etcd revision progresses past the watch we establish. + // As etcd does not send progress notifications for watches on future revisions. + // + // A Kubernetes watch is exclusive (first event received is after a given RV), which translates + // to an inclusive etcd watch at revision+1. Without this increment, if no other writes + // have occurred, the watch would be on a future revision, preventing progress + // notifications. + increaseRV(ctx, t) opts := storage.ListOptions{ ResourceVersion: out.ResourceVersion,