mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	storage/testing/watcher_tests: make TestCacherWatchSemantics storage agnostic
This commit is contained in:
		@@ -22,6 +22,7 @@ import (
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/fields"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
@@ -327,6 +328,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
 | 
			
		||||
	storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCacherWatchSemantics(t *testing.T) {
 | 
			
		||||
	store, terminate := testSetupWithEtcdAndCreateWrapper(t)
 | 
			
		||||
	t.Cleanup(terminate)
 | 
			
		||||
	storagetesting.RunWatchSemantics(context.TODO(), t, store)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCacherWatchSemanticInitialEventsExtended(t *testing.T) {
 | 
			
		||||
	store, terminate := testSetupWithEtcdAndCreateWrapper(t)
 | 
			
		||||
	t.Cleanup(terminate)
 | 
			
		||||
	storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ===================================================
 | 
			
		||||
// Test-setup related function are following.
 | 
			
		||||
// ===================================================
 | 
			
		||||
@@ -424,3 +437,36 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context
 | 
			
		||||
 | 
			
		||||
	return ctx, cacher, server, terminate
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testSetupWithEtcdAndCreateWrapper(t *testing.T, opts ...setupOption) (storage.Interface, tearDownFunc) {
 | 
			
		||||
	_, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
 | 
			
		||||
 | 
			
		||||
	if err := cacher.ready.wait(context.TODO()); err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
			
		||||
	}
 | 
			
		||||
	return &createWrapper{Cacher: cacher}, tearDown
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type createWrapper struct {
 | 
			
		||||
	*Cacher
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *createWrapper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
 | 
			
		||||
	if err := c.Cacher.Create(ctx, key, obj, out, ttl); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
 | 
			
		||||
		currentObj := c.Cacher.newFunc()
 | 
			
		||||
		err := c.Cacher.Get(ctx, key, storage.GetOptions{ResourceVersion: "0"}, currentObj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if storage.IsNotFound(err) {
 | 
			
		||||
				return false, nil
 | 
			
		||||
			}
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
		if !apiequality.Semantic.DeepEqual(currentObj, out) {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1340,15 +1340,6 @@ func verifyEvents(t *testing.T, w watch.Interface, events []watch.Event, strictO
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func verifyNoEvents(t *testing.T, w watch.Interface) {
 | 
			
		||||
	select {
 | 
			
		||||
	case e := <-w.ResultChan():
 | 
			
		||||
		t.Errorf("Unexpected: %#v event received, expected no events", e)
 | 
			
		||||
	case <-time.After(time.Second):
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCachingDeleteEvents(t *testing.T) {
 | 
			
		||||
	backingStorage := &dummyStorage{}
 | 
			
		||||
	cacher, _, err := newTestCacher(backingStorage)
 | 
			
		||||
@@ -1611,205 +1602,6 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCacherWatchSemantics(t *testing.T) {
 | 
			
		||||
	trueVal, falseVal := true, false
 | 
			
		||||
	makePod := func(rv uint64) *example.Pod {
 | 
			
		||||
		return &example.Pod{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name:            fmt.Sprintf("pod-%d", rv),
 | 
			
		||||
				Namespace:       "ns",
 | 
			
		||||
				ResourceVersion: fmt.Sprintf("%d", rv),
 | 
			
		||||
				Annotations:     map[string]string{},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	scenarios := []struct {
 | 
			
		||||
		name                   string
 | 
			
		||||
		allowWatchBookmarks    bool
 | 
			
		||||
		sendInitialEvents      *bool
 | 
			
		||||
		resourceVersion        string
 | 
			
		||||
		storageResourceVersion string
 | 
			
		||||
 | 
			
		||||
		initialPods                []*example.Pod
 | 
			
		||||
		podsAfterEstablishingWatch []*example.Pod
 | 
			
		||||
 | 
			
		||||
		expectedInitialEventsInStrictOrder   []watch.Event
 | 
			
		||||
		expectedInitialEventsInRandomOrder   []watch.Event
 | 
			
		||||
		expectedEventsAfterEstablishingWatch []watch.Event
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:                               "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102",
 | 
			
		||||
			allowWatchBookmarks:                true,
 | 
			
		||||
			sendInitialEvents:                  &trueVal,
 | 
			
		||||
			storageResourceVersion:             "102",
 | 
			
		||||
			initialPods:                        []*example.Pod{makePod(101)},
 | 
			
		||||
			podsAfterEstablishingWatch:         []*example.Pod{makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: []watch.Event{
 | 
			
		||||
				{Type: watch.Added, Object: makePod(102)},
 | 
			
		||||
				{Type: watch.Bookmark, Object: &example.Pod{
 | 
			
		||||
					ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
						ResourceVersion: "102",
 | 
			
		||||
						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
 | 
			
		||||
					},
 | 
			
		||||
				}},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                   "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105",
 | 
			
		||||
			allowWatchBookmarks:    true,
 | 
			
		||||
			sendInitialEvents:      &trueVal,
 | 
			
		||||
			resourceVersion:        "0",
 | 
			
		||||
			storageResourceVersion: "105",
 | 
			
		||||
			initialPods:            []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{
 | 
			
		||||
				{Type: watch.Added, Object: makePod(101)},
 | 
			
		||||
				{Type: watch.Added, Object: makePod(102)},
 | 
			
		||||
			},
 | 
			
		||||
			expectedInitialEventsInStrictOrder: []watch.Event{
 | 
			
		||||
				{Type: watch.Bookmark, Object: &example.Pod{
 | 
			
		||||
					ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
						ResourceVersion: "102",
 | 
			
		||||
						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
 | 
			
		||||
					},
 | 
			
		||||
				}},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                               "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105",
 | 
			
		||||
			allowWatchBookmarks:                true,
 | 
			
		||||
			sendInitialEvents:                  &trueVal,
 | 
			
		||||
			resourceVersion:                    "101",
 | 
			
		||||
			storageResourceVersion:             "105",
 | 
			
		||||
			initialPods:                        []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
			expectedInitialEventsInStrictOrder: []watch.Event{
 | 
			
		||||
				{Type: watch.Bookmark, Object: &example.Pod{
 | 
			
		||||
					ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
						ResourceVersion: "102",
 | 
			
		||||
						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
 | 
			
		||||
					},
 | 
			
		||||
				}},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                                 "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102",
 | 
			
		||||
			sendInitialEvents:                    &trueVal,
 | 
			
		||||
			storageResourceVersion:               "102",
 | 
			
		||||
			initialPods:                          []*example.Pod{makePod(101)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder:   []watch.Event{{Type: watch.Added, Object: makePod(101)}},
 | 
			
		||||
			podsAfterEstablishingWatch:           []*example.Pod{makePod(102)},
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                               "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105",
 | 
			
		||||
			sendInitialEvents:                  &trueVal,
 | 
			
		||||
			resourceVersion:                    "0",
 | 
			
		||||
			storageResourceVersion:             "105",
 | 
			
		||||
			initialPods:                        []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                   "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105",
 | 
			
		||||
			sendInitialEvents:      &trueVal,
 | 
			
		||||
			resourceVersion:        "101",
 | 
			
		||||
			storageResourceVersion: "105",
 | 
			
		||||
			initialPods:            []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			// make sure we only get initial events that are > initial RV (101)
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                                 "sendInitialEvents=false, RV=unset, storageRV=103",
 | 
			
		||||
			sendInitialEvents:                    &falseVal,
 | 
			
		||||
			storageResourceVersion:               "103",
 | 
			
		||||
			initialPods:                          []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			podsAfterEstablishingWatch:           []*example.Pod{makePod(104)},
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                                 "sendInitialEvents=false, RV=0, storageRV=105",
 | 
			
		||||
			sendInitialEvents:                    &falseVal,
 | 
			
		||||
			resourceVersion:                      "0",
 | 
			
		||||
			storageResourceVersion:               "105",
 | 
			
		||||
			initialPods:                          []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			podsAfterEstablishingWatch:           []*example.Pod{makePod(103)},
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                               "legacy, RV=0, storageRV=105",
 | 
			
		||||
			resourceVersion:                    "0",
 | 
			
		||||
			storageResourceVersion:             "105",
 | 
			
		||||
			initialPods:                        []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                   "legacy, RV=unset, storageRV=105",
 | 
			
		||||
			storageResourceVersion: "105",
 | 
			
		||||
			initialPods:            []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			// no events because the watch is delegated to the underlying storage
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, scenario := range scenarios {
 | 
			
		||||
		t.Run(scenario.name, func(t *testing.T) {
 | 
			
		||||
			// set up env
 | 
			
		||||
			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
 | 
			
		||||
			storageListMetaResourceVersion := ""
 | 
			
		||||
			backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
 | 
			
		||||
				podList := listObj.(*example.PodList)
 | 
			
		||||
				podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion}
 | 
			
		||||
				return nil
 | 
			
		||||
			}}
 | 
			
		||||
 | 
			
		||||
			cacher, _, err := newTestCacher(backingStorage)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("falied to create cacher: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			defer cacher.Stop()
 | 
			
		||||
			if err := cacher.ready.wait(context.TODO()); err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// now, run a scenario
 | 
			
		||||
			// but first let's add some initial data
 | 
			
		||||
			for _, obj := range scenario.initialPods {
 | 
			
		||||
				err = cacher.watchCache.Add(obj)
 | 
			
		||||
				require.NoError(t, err, "failed to add a pod: %v")
 | 
			
		||||
			}
 | 
			
		||||
			// read request params
 | 
			
		||||
			opts := storage.ListOptions{Predicate: storage.Everything}
 | 
			
		||||
			opts.SendInitialEvents = scenario.sendInitialEvents
 | 
			
		||||
			opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks
 | 
			
		||||
			if len(scenario.resourceVersion) > 0 {
 | 
			
		||||
				opts.ResourceVersion = scenario.resourceVersion
 | 
			
		||||
			}
 | 
			
		||||
			// before starting a new watch set a storage RV to some future value
 | 
			
		||||
			storageListMetaResourceVersion = scenario.storageResourceVersion
 | 
			
		||||
 | 
			
		||||
			w, err := cacher.Watch(context.Background(), "pods/ns", opts)
 | 
			
		||||
			require.NoError(t, err, "failed to create watch: %v")
 | 
			
		||||
			defer w.Stop()
 | 
			
		||||
 | 
			
		||||
			// make sure we only get initial events
 | 
			
		||||
			verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false)
 | 
			
		||||
			verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true)
 | 
			
		||||
			verifyNoEvents(t, w)
 | 
			
		||||
			// add a pod that is greater than the storage's RV when the watch was started
 | 
			
		||||
			for _, obj := range scenario.podsAfterEstablishingWatch {
 | 
			
		||||
				err = cacher.watchCache.Add(obj)
 | 
			
		||||
				require.NoError(t, err, "failed to add a pod: %v")
 | 
			
		||||
			}
 | 
			
		||||
			verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true)
 | 
			
		||||
			verifyNoEvents(t, w)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
 | 
			
		||||
	backingStorage := &dummyStorage{}
 | 
			
		||||
 
 | 
			
		||||
@@ -1222,205 +1222,223 @@ func RunSendInitialEventsBackwardCompatibility(ctx context.Context, t *testing.T
 | 
			
		||||
	w.Stop()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCacherWatchSemantics(t *testing.T) {
 | 
			
		||||
func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interface) {
 | 
			
		||||
	trueVal, falseVal := true, false
 | 
			
		||||
	makePod := func(rv uint64) *example.Pod {
 | 
			
		||||
		return &example.Pod{
 | 
			
		||||
			ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
				Name:            fmt.Sprintf("pod-%d", rv),
 | 
			
		||||
				Namespace:       "ns",
 | 
			
		||||
				ResourceVersion: fmt.Sprintf("%d", rv),
 | 
			
		||||
				Annotations:     map[string]string{},
 | 
			
		||||
	addEventsFromCreatedPods := func(createdInitialPods []*example.Pod) []watch.Event {
 | 
			
		||||
		var ret []watch.Event
 | 
			
		||||
		for _, createdPod := range createdInitialPods {
 | 
			
		||||
			ret = append(ret, watch.Event{Type: watch.Added, Object: createdPod})
 | 
			
		||||
		}
 | 
			
		||||
		return ret
 | 
			
		||||
	}
 | 
			
		||||
	initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) watch.Event {
 | 
			
		||||
		return watch.Event{
 | 
			
		||||
			Type: watch.Bookmark,
 | 
			
		||||
			Object: &example.Pod{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					ResourceVersion: createdInitialPods[len(createdInitialPods)-1].ResourceVersion,
 | 
			
		||||
					Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	scenarios := []struct {
 | 
			
		||||
		name                   string
 | 
			
		||||
		allowWatchBookmarks    bool
 | 
			
		||||
		sendInitialEvents      *bool
 | 
			
		||||
		resourceVersion        string
 | 
			
		||||
		storageResourceVersion string
 | 
			
		||||
		name                string
 | 
			
		||||
		allowWatchBookmarks bool
 | 
			
		||||
		sendInitialEvents   *bool
 | 
			
		||||
		resourceVersion     string
 | 
			
		||||
 | 
			
		||||
		initialPods                []*example.Pod
 | 
			
		||||
		podsAfterEstablishingWatch []*example.Pod
 | 
			
		||||
		initialPods                func(ns string) []*example.Pod
 | 
			
		||||
		podsAfterEstablishingWatch func(ns string) []*example.Pod
 | 
			
		||||
 | 
			
		||||
		expectedInitialEventsInStrictOrder   []watch.Event
 | 
			
		||||
		expectedInitialEventsInRandomOrder   []watch.Event
 | 
			
		||||
		expectedEventsAfterEstablishingWatch []watch.Event
 | 
			
		||||
		expectedInitialEventsInRandomOrder   func(createdInitialPods []*example.Pod) []watch.Event
 | 
			
		||||
		expectedInitialEventsInStrictOrder   func(createdInitialPods []*example.Pod) []watch.Event
 | 
			
		||||
		expectedEventsAfterEstablishingWatch func(createdPodsAfterWatch []*example.Pod) []watch.Event
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:                               "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102",
 | 
			
		||||
			name:                               "allowWatchBookmarks=true, sendInitialEvents=true, RV=0",
 | 
			
		||||
			allowWatchBookmarks:                true,
 | 
			
		||||
			sendInitialEvents:                  &trueVal,
 | 
			
		||||
			storageResourceVersion:             "102",
 | 
			
		||||
			initialPods:                        []*example.Pod{makePod(101)},
 | 
			
		||||
			podsAfterEstablishingWatch:         []*example.Pod{makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}},
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: []watch.Event{
 | 
			
		||||
				{Type: watch.Added, Object: makePod(102)},
 | 
			
		||||
				{Type: watch.Bookmark, Object: &example.Pod{
 | 
			
		||||
					ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
						ResourceVersion: "102",
 | 
			
		||||
						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
 | 
			
		||||
					},
 | 
			
		||||
				}},
 | 
			
		||||
			resourceVersion:                    "0",
 | 
			
		||||
			initialPods:                        func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "2"), makePod(ns, "3")} },
 | 
			
		||||
			expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
 | 
			
		||||
			expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
 | 
			
		||||
				return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                   "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105",
 | 
			
		||||
			allowWatchBookmarks:    true,
 | 
			
		||||
			sendInitialEvents:      &trueVal,
 | 
			
		||||
			resourceVersion:        "0",
 | 
			
		||||
			storageResourceVersion: "105",
 | 
			
		||||
			initialPods:            []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{
 | 
			
		||||
				{Type: watch.Added, Object: makePod(101)},
 | 
			
		||||
				{Type: watch.Added, Object: makePod(102)},
 | 
			
		||||
			},
 | 
			
		||||
			expectedInitialEventsInStrictOrder: []watch.Event{
 | 
			
		||||
				{Type: watch.Bookmark, Object: &example.Pod{
 | 
			
		||||
					ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
						ResourceVersion: "102",
 | 
			
		||||
						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
 | 
			
		||||
					},
 | 
			
		||||
				}},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                               "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105",
 | 
			
		||||
			name:                               "allowWatchBookmarks=true, sendInitialEvents=true, RV=1",
 | 
			
		||||
			allowWatchBookmarks:                true,
 | 
			
		||||
			sendInitialEvents:                  &trueVal,
 | 
			
		||||
			resourceVersion:                    "101",
 | 
			
		||||
			storageResourceVersion:             "105",
 | 
			
		||||
			initialPods:                        []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
			expectedInitialEventsInStrictOrder: []watch.Event{
 | 
			
		||||
				{Type: watch.Bookmark, Object: &example.Pod{
 | 
			
		||||
					ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
						ResourceVersion: "102",
 | 
			
		||||
						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
 | 
			
		||||
					},
 | 
			
		||||
				}},
 | 
			
		||||
			resourceVersion:                    "1",
 | 
			
		||||
			initialPods:                        func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "4"), makePod(ns, "5")} },
 | 
			
		||||
			expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
 | 
			
		||||
			expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
 | 
			
		||||
				return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                                 "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102",
 | 
			
		||||
			name:                                 "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset",
 | 
			
		||||
			sendInitialEvents:                    &trueVal,
 | 
			
		||||
			storageResourceVersion:               "102",
 | 
			
		||||
			initialPods:                          []*example.Pod{makePod(101)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder:   []watch.Event{{Type: watch.Added, Object: makePod(101)}},
 | 
			
		||||
			podsAfterEstablishingWatch:           []*example.Pod{makePod(102)},
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
			initialPods:                          func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "6")} },
 | 
			
		||||
			expectedInitialEventsInRandomOrder:   addEventsFromCreatedPods,
 | 
			
		||||
			podsAfterEstablishingWatch:           func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "7")} },
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                               "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105",
 | 
			
		||||
			name:                               "allowWatchBookmarks=false, sendInitialEvents=true, RV=0",
 | 
			
		||||
			sendInitialEvents:                  &trueVal,
 | 
			
		||||
			resourceVersion:                    "0",
 | 
			
		||||
			storageResourceVersion:             "105",
 | 
			
		||||
			initialPods:                        []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
			initialPods:                        func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "8"), makePod(ns, "9")} },
 | 
			
		||||
			expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                   "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105",
 | 
			
		||||
			sendInitialEvents:      &trueVal,
 | 
			
		||||
			resourceVersion:        "101",
 | 
			
		||||
			storageResourceVersion: "105",
 | 
			
		||||
			initialPods:            []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			// make sure we only get initial events that are > initial RV (101)
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
			name:              "allowWatchBookmarks=false, sendInitialEvents=true, RV=1",
 | 
			
		||||
			sendInitialEvents: &trueVal,
 | 
			
		||||
			resourceVersion:   "1",
 | 
			
		||||
			initialPods:       func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "10"), makePod(ns, "11")} },
 | 
			
		||||
			// make sure we only get initial events that are > initial RV (1)
 | 
			
		||||
			expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                                 "sendInitialEvents=false, RV=unset, storageRV=103",
 | 
			
		||||
			sendInitialEvents:                    &falseVal,
 | 
			
		||||
			storageResourceVersion:               "103",
 | 
			
		||||
			initialPods:                          []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			podsAfterEstablishingWatch:           []*example.Pod{makePod(104)},
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}},
 | 
			
		||||
			name:                       "sendInitialEvents=false, RV=unset",
 | 
			
		||||
			sendInitialEvents:          &falseVal,
 | 
			
		||||
			initialPods:                func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "12"), makePod(ns, "13")} },
 | 
			
		||||
			podsAfterEstablishingWatch: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "14")} },
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: func(createdPodsAfterWatch []*example.Pod) []watch.Event {
 | 
			
		||||
				return []watch.Event{{Type: watch.Added, Object: createdPodsAfterWatch[0]}}
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                                 "sendInitialEvents=false, RV=0, storageRV=105",
 | 
			
		||||
			sendInitialEvents:                    &falseVal,
 | 
			
		||||
			resourceVersion:                      "0",
 | 
			
		||||
			storageResourceVersion:               "105",
 | 
			
		||||
			initialPods:                          []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			podsAfterEstablishingWatch:           []*example.Pod{makePod(103)},
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}},
 | 
			
		||||
			name:                       "sendInitialEvents=false, RV=0",
 | 
			
		||||
			sendInitialEvents:          &falseVal,
 | 
			
		||||
			resourceVersion:            "0",
 | 
			
		||||
			initialPods:                func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "15"), makePod(ns, "16")} },
 | 
			
		||||
			podsAfterEstablishingWatch: func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "17")} },
 | 
			
		||||
			expectedEventsAfterEstablishingWatch: func(createdPodsAfterWatch []*example.Pod) []watch.Event {
 | 
			
		||||
				return []watch.Event{{Type: watch.Added, Object: createdPodsAfterWatch[0]}}
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                               "legacy, RV=0, storageRV=105",
 | 
			
		||||
			name:                               "legacy, RV=0",
 | 
			
		||||
			resourceVersion:                    "0",
 | 
			
		||||
			storageResourceVersion:             "105",
 | 
			
		||||
			initialPods:                        []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}},
 | 
			
		||||
			initialPods:                        func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "18"), makePod(ns, "19")} },
 | 
			
		||||
			expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// note we set storage's RV to some future value, mustn't be used by this scenario
 | 
			
		||||
			name:                   "legacy, RV=unset, storageRV=105",
 | 
			
		||||
			storageResourceVersion: "105",
 | 
			
		||||
			initialPods:            []*example.Pod{makePod(101), makePod(102)},
 | 
			
		||||
			// no events because the watch is delegated to the underlying storage
 | 
			
		||||
			name:                               "legacy, RV=unset",
 | 
			
		||||
			initialPods:                        func(ns string) []*example.Pod { return []*example.Pod{makePod(ns, "20"), makePod(ns, "21")} },
 | 
			
		||||
			expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, scenario := range scenarios {
 | 
			
		||||
	for idx, scenario := range scenarios {
 | 
			
		||||
		t.Run(scenario.name, func(t *testing.T) {
 | 
			
		||||
			// set up env
 | 
			
		||||
			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
 | 
			
		||||
			storageListMetaResourceVersion := ""
 | 
			
		||||
			backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
 | 
			
		||||
				podList := listObj.(*example.PodList)
 | 
			
		||||
				podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion}
 | 
			
		||||
				return nil
 | 
			
		||||
			}}
 | 
			
		||||
 | 
			
		||||
			cacher, _, err := newTestCacher(backingStorage)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("falied to create cacher: %v", err)
 | 
			
		||||
			if scenario.expectedInitialEventsInStrictOrder == nil {
 | 
			
		||||
				scenario.expectedInitialEventsInStrictOrder = func(_ []*example.Pod) []watch.Event { return nil }
 | 
			
		||||
			}
 | 
			
		||||
			defer cacher.Stop()
 | 
			
		||||
			if err := cacher.ready.wait(context.TODO()); err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
			
		||||
			if scenario.expectedInitialEventsInRandomOrder == nil {
 | 
			
		||||
				scenario.expectedInitialEventsInRandomOrder = func(_ []*example.Pod) []watch.Event { return nil }
 | 
			
		||||
			}
 | 
			
		||||
			if scenario.podsAfterEstablishingWatch == nil {
 | 
			
		||||
				scenario.podsAfterEstablishingWatch = func(_ string) []*example.Pod { return nil }
 | 
			
		||||
			}
 | 
			
		||||
			if scenario.expectedEventsAfterEstablishingWatch == nil {
 | 
			
		||||
				scenario.expectedEventsAfterEstablishingWatch = func(_ []*example.Pod) []watch.Event { return nil }
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// now, run a scenario
 | 
			
		||||
			// but first let's add some initial data
 | 
			
		||||
			for _, obj := range scenario.initialPods {
 | 
			
		||||
				err = cacher.watchCache.Add(obj)
 | 
			
		||||
				require.NoError(t, err, "failed to add a pod: %v")
 | 
			
		||||
			var createdPods []*example.Pod
 | 
			
		||||
			ns := fmt.Sprintf("ns-%v", idx)
 | 
			
		||||
			for _, obj := range scenario.initialPods(ns) {
 | 
			
		||||
				out := &example.Pod{}
 | 
			
		||||
				err := store.Create(ctx, computePodKey(obj), obj, out, 0)
 | 
			
		||||
				require.NoError(t, err, "failed to add a pod: %v", obj)
 | 
			
		||||
				createdPods = append(createdPods, out)
 | 
			
		||||
			}
 | 
			
		||||
			// read request params
 | 
			
		||||
			opts := storage.ListOptions{Predicate: storage.Everything}
 | 
			
		||||
 | 
			
		||||
			opts := storage.ListOptions{Predicate: storage.Everything, Recursive: true}
 | 
			
		||||
			opts.SendInitialEvents = scenario.sendInitialEvents
 | 
			
		||||
			opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks
 | 
			
		||||
			if len(scenario.resourceVersion) > 0 {
 | 
			
		||||
				opts.ResourceVersion = scenario.resourceVersion
 | 
			
		||||
			}
 | 
			
		||||
			// before starting a new watch set a storage RV to some future value
 | 
			
		||||
			storageListMetaResourceVersion = scenario.storageResourceVersion
 | 
			
		||||
 | 
			
		||||
			w, err := cacher.Watch(context.Background(), "pods/ns", opts)
 | 
			
		||||
			w, err := store.Watch(context.Background(), fmt.Sprintf("/pods/%s", ns), opts)
 | 
			
		||||
			require.NoError(t, err, "failed to create watch: %v")
 | 
			
		||||
			defer w.Stop()
 | 
			
		||||
 | 
			
		||||
			// make sure we only get initial events
 | 
			
		||||
			verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false)
 | 
			
		||||
			verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true)
 | 
			
		||||
			verifyNoEvents(t, w)
 | 
			
		||||
			testCheckResultsInRandomOrder(t, w, scenario.expectedInitialEventsInRandomOrder(createdPods))
 | 
			
		||||
			testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsInStrictOrder(createdPods))
 | 
			
		||||
			testCheckNoMoreResults(t, w)
 | 
			
		||||
 | 
			
		||||
			createdPods = []*example.Pod{}
 | 
			
		||||
			// add a pod that is greater than the storage's RV when the watch was started
 | 
			
		||||
			for _, obj := range scenario.podsAfterEstablishingWatch {
 | 
			
		||||
				err = cacher.watchCache.Add(obj)
 | 
			
		||||
			for _, obj := range scenario.podsAfterEstablishingWatch(ns) {
 | 
			
		||||
				out := &example.Pod{}
 | 
			
		||||
				err = store.Create(ctx, computePodKey(obj), obj, out, 0)
 | 
			
		||||
				require.NoError(t, err, "failed to add a pod: %v")
 | 
			
		||||
				createdPods = append(createdPods, out)
 | 
			
		||||
			}
 | 
			
		||||
			verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true)
 | 
			
		||||
			verifyNoEvents(t, w)
 | 
			
		||||
			testCheckResultsInStrictOrder(t, w, scenario.expectedEventsAfterEstablishingWatch(createdPods))
 | 
			
		||||
			testCheckNoMoreResults(t, w)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RunWatchSemanticInitialEventsExtended checks if the bookmark event marking the end of the list stream contains the global RV
 | 
			
		||||
func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, store storage.Interface) {
 | 
			
		||||
	trueVal := true
 | 
			
		||||
	initialPod := func(ns string) *example.Pod { return makePod(ns, "2") }
 | 
			
		||||
	expectedInitialEventsInStrictOrder := func(firstPod, secondPod *example.Pod) []watch.Event {
 | 
			
		||||
		return []watch.Event{
 | 
			
		||||
			{Type: watch.Added, Object: firstPod},
 | 
			
		||||
			{Type: watch.Bookmark, Object: &example.Pod{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					ResourceVersion: secondPod.ResourceVersion,
 | 
			
		||||
					Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
 | 
			
		||||
				},
 | 
			
		||||
			}},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
 | 
			
		||||
 | 
			
		||||
	firstPod := &example.Pod{}
 | 
			
		||||
	nsPrefix := "foo"
 | 
			
		||||
	ns := fmt.Sprintf("ns-%s", nsPrefix)
 | 
			
		||||
	err := store.Create(ctx, computePodKey(initialPod(ns)), initialPod(ns), firstPod, 0)
 | 
			
		||||
	require.NoError(t, err, "failed to add a pod: %v")
 | 
			
		||||
 | 
			
		||||
	// add the pod to a different ns to advance the global RV
 | 
			
		||||
	secondPod := &example.Pod{}
 | 
			
		||||
	newNs := fmt.Sprintf("other-ns-%s", nsPrefix)
 | 
			
		||||
	err = store.Create(ctx, computePodKey(initialPod(newNs)), initialPod(newNs), secondPod, 0)
 | 
			
		||||
	require.NoError(t, err, "failed to add a pod: %v")
 | 
			
		||||
 | 
			
		||||
	opts := storage.ListOptions{Predicate: storage.Everything, Recursive: true}
 | 
			
		||||
	opts.SendInitialEvents = &trueVal
 | 
			
		||||
	opts.Predicate.AllowWatchBookmarks = true
 | 
			
		||||
 | 
			
		||||
	w, err := store.Watch(context.Background(), fmt.Sprintf("/pods/%s", ns), opts)
 | 
			
		||||
	require.NoError(t, err, "failed to create watch: %v")
 | 
			
		||||
	defer w.Stop()
 | 
			
		||||
 | 
			
		||||
	// make sure we only get initial events from the first ns
 | 
			
		||||
	// followed by the bookmark with the global RV
 | 
			
		||||
	testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(firstPod, secondPod))
 | 
			
		||||
	testCheckNoMoreResults(t, w)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func makePod(namespace, namePrefix string) *example.Pod {
 | 
			
		||||
	return &example.Pod{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name:      fmt.Sprintf("pod-%s", namePrefix),
 | 
			
		||||
			Namespace: namespace,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type testWatchStruct struct {
 | 
			
		||||
	obj         *example.Pod
 | 
			
		||||
	expectEvent bool
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user