mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 18:28:13 +00:00 
			
		
		
		
	Separate compactWatchCache from increaseRV
This commit is contained in:
		 Marek Siarkowicz
					Marek Siarkowicz
				
			
				
					committed by
					
						 Marek Siarkowicz
						Marek Siarkowicz
					
				
			
			
				
	
			
			
			 Marek Siarkowicz
						Marek Siarkowicz
					
				
			
						parent
						
							c30b1eb09b
						
					
				
				
					commit
					15cb82b3b3
				
			| @@ -182,7 +182,7 @@ func TestList(t *testing.T) { | ||||
| 			featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) | ||||
| 			ctx, cacher, server, terminate := testSetupWithEtcdServer(t) | ||||
| 			t.Cleanup(terminate) | ||||
| 			storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true) | ||||
| 			storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
| @@ -192,7 +192,7 @@ func TestConsistentList(t *testing.T) { | ||||
| 			featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) | ||||
| 			ctx, cacher, server, terminate := testSetupWithEtcdServer(t) | ||||
| 			t.Cleanup(terminate) | ||||
| 			storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, consistentRead) | ||||
| 			storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
| @@ -203,7 +203,7 @@ func TestGetListNonRecursive(t *testing.T) { | ||||
| 			featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead) | ||||
| 			ctx, cacher, server, terminate := testSetupWithEtcdServer(t) | ||||
| 			t.Cleanup(terminate) | ||||
| 			storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher) | ||||
| 			storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
| @@ -297,7 +297,7 @@ func TestWatch(t *testing.T) { | ||||
| func TestWatchFromZero(t *testing.T) { | ||||
| 	ctx, cacher, server, terminate := testSetupWithEtcdServer(t) | ||||
| 	t.Cleanup(terminate) | ||||
| 	storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client)) | ||||
| 	storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactWatchCache(cacher, server.V3Client.Client)) | ||||
| } | ||||
|  | ||||
| func TestDeleteTriggerWatch(t *testing.T) { | ||||
|   | ||||
| @@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string { | ||||
| 	return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) | ||||
| } | ||||
|  | ||||
| func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction { | ||||
| func compactWatchCache(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction { | ||||
| 	return func(ctx context.Context, t *testing.T, resourceVersion string) { | ||||
| 		versioner := storage.APIObjectVersioner{} | ||||
| 		rv, err := versioner.ParseResourceVersion(resourceVersion) | ||||
| @@ -117,12 +117,16 @@ func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.C | ||||
| 			c.cacher.watchCache.startIndex++ | ||||
| 		} | ||||
| 		c.cacher.watchCache.listResourceVersion = rv | ||||
|  | ||||
| 		if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil { | ||||
| 			t.Fatalf("Could not update compact_rev_key: %v", err) | ||||
| 		} | ||||
| 		if _, err = client.Compact(ctx, int64(rv)); err != nil { | ||||
| 		if _, err := client.Compact(ctx, int64(rv)); err != nil { | ||||
| 			t.Fatalf("Could not compact: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func increaseRV(client *clientv3.Client) storagetesting.IncreaseRVFunc { | ||||
| 	return func(ctx context.Context, t *testing.T) { | ||||
| 		if _, err := client.KV.Put(ctx, "increaseRV", "ok"); err != nil { | ||||
| 			t.Fatalf("Could not update increaseRV: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -172,7 +172,7 @@ func TestListPaging(t *testing.T) { | ||||
|  | ||||
| func TestGetListNonRecursive(t *testing.T) { | ||||
| 	ctx, store, client := testSetup(t) | ||||
| 	storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client.Client), store) | ||||
| 	storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(client.Client), store) | ||||
| } | ||||
|  | ||||
| func TestGetListRecursivePrefix(t *testing.T) { | ||||
| @@ -249,12 +249,12 @@ func TestTransformationFailure(t *testing.T) { | ||||
|  | ||||
| func TestList(t *testing.T) { | ||||
| 	ctx, store, client := testSetup(t) | ||||
| 	storagetesting.RunTestList(ctx, t, store, compactStorage(client.Client), false) | ||||
| 	storagetesting.RunTestList(ctx, t, store, increaseRV(client.Client), false) | ||||
| } | ||||
|  | ||||
| func TestConsistentList(t *testing.T) { | ||||
| 	ctx, store, client := testSetup(t) | ||||
| 	storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client.Client), false, true) | ||||
| 	storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true) | ||||
| } | ||||
|  | ||||
| func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { | ||||
| @@ -313,19 +313,27 @@ func TestNamespaceScopedList(t *testing.T) { | ||||
| 	storagetesting.RunTestNamespaceScopedList(ctx, t, store) | ||||
| } | ||||
|  | ||||
| func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { | ||||
| func compactStorage(client *clientv3.Client) storagetesting.Compaction { | ||||
| 	return func(ctx context.Context, t *testing.T, resourceVersion string) { | ||||
| 		versioner := storage.APIObjectVersioner{} | ||||
| 		rv, err := versioner.ParseResourceVersion(resourceVersion) | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		if _, _, err = compact(ctx, etcdClient, 0, int64(rv)); err != nil { | ||||
| 		if _, err = client.Compact(ctx, int64(rv)); err != nil { | ||||
| 			t.Fatalf("Unable to compact, %v", err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func increaseRV(client *clientv3.Client) storagetesting.IncreaseRVFunc { | ||||
| 	return func(ctx context.Context, t *testing.T) { | ||||
| 		if _, err := client.KV.Put(ctx, "increaseRV", "ok"); err != nil { | ||||
| 			t.Fatalf("Could not update increaseRV: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestListInconsistentContinuation(t *testing.T) { | ||||
| 	ctx, store, client := testSetup(t) | ||||
| 	storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client.Client)) | ||||
|   | ||||
| @@ -622,7 +622,7 @@ func RunTestPreconditionalDeleteWithOnlySuggestionPass(ctx context.Context, t *t | ||||
| 	expectNoDiff(t, "incorrect pod:", updatedPod, out) | ||||
| } | ||||
|  | ||||
| func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, ignoreWatchCacheTests bool) { | ||||
| func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, ignoreWatchCacheTests bool) { | ||||
| 	initialRV, createdPods, updatedPod, err := seedMultiLevelData(ctx, store) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| @@ -648,10 +648,8 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com | ||||
| 		pod := obj.(*example.Pod) | ||||
| 		return nil, fields.Set{"metadata.name": pod.Name, "spec.nodeName": pod.Spec.NodeName}, nil | ||||
| 	} | ||||
| 	// Use compact to increase etcd global revision without changes to any resources. | ||||
| 	// The increase in resources version comes from Kubernetes compaction updating hidden key. | ||||
| 	// Used to test consistent List to confirm it returns latest etcd revision. | ||||
| 	compaction(ctx, t, initialRV) | ||||
| 	// Increase RV to test consistent List. | ||||
| 	increaseRV(ctx, t) | ||||
| 	currentRV := fmt.Sprintf("%d", continueRV+1) | ||||
|  | ||||
| 	tests := []struct { | ||||
| @@ -1591,7 +1589,7 @@ func ExpectContinueMatches(t *testing.T, expect, got string) { | ||||
| 	t.Errorf("expected continue token: %s, got: %s", expectDecoded, gotDecoded) | ||||
| } | ||||
|  | ||||
| func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, cacheEnabled, consistentReadsSupported bool) { | ||||
| func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, cacheEnabled, consistentReadsSupported bool) { | ||||
| 	outPod := &example.Pod{} | ||||
| 	inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}} | ||||
| 	err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0) | ||||
| @@ -1599,7 +1597,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte | ||||
| 		t.Errorf("Unexpected error: %v", err) | ||||
| 	} | ||||
| 	lastObjecRV := outPod.ResourceVersion | ||||
| 	compaction(ctx, t, outPod.ResourceVersion) | ||||
| 	increaseRV(ctx, t) | ||||
| 	parsedRV, _ := strconv.Atoi(outPod.ResourceVersion) | ||||
| 	currentRV := fmt.Sprintf("%d", parsedRV+1) | ||||
|  | ||||
| @@ -1609,7 +1607,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte | ||||
| 	} | ||||
|  | ||||
| 	secondNonConsistentReadRV := lastObjecRV | ||||
| 	if consistentReadsSupported { | ||||
| 	if !cacheEnabled || consistentReadsSupported { | ||||
| 		secondNonConsistentReadRV = currentRV | ||||
| 	} | ||||
|  | ||||
| @@ -1749,7 +1747,7 @@ func seedMultiLevelData(ctx context.Context, store storage.Interface) (initialRV | ||||
| 	return initialRV, created, updated, nil | ||||
| } | ||||
|  | ||||
| func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Compaction, store storage.Interface) { | ||||
| func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, increaseRV IncreaseRVFunc, store storage.Interface) { | ||||
| 	key, prevStoredObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) | ||||
| 	prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion) | ||||
|  | ||||
| @@ -1763,10 +1761,8 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Co | ||||
| 		t.Fatalf("update failed: %v", err) | ||||
| 	} | ||||
| 	objRV, _ := strconv.Atoi(storedObj.ResourceVersion) | ||||
| 	// Use compact to increase etcd global revision without changes to any resources. | ||||
| 	// The increase in resources version comes from Kubernetes compaction updating hidden key. | ||||
| 	// Used to test consistent List to confirm it returns latest etcd revision. | ||||
| 	compaction(ctx, t, prevStoredObj.ResourceVersion) | ||||
| 	// Increase RV to test consistent List. | ||||
| 	increaseRV(ctx, t) | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		name                 string | ||||
| @@ -2320,6 +2316,7 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store | ||||
| } | ||||
|  | ||||
| type Compaction func(ctx context.Context, t *testing.T, resourceVersion string) | ||||
| type IncreaseRVFunc func(ctx context.Context, t *testing.T) | ||||
|  | ||||
| func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { | ||||
| 	if compaction == nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user