mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-10-31 10:18:13 +00:00 
			
		
		
		
	Merge pull request #86430 from wojtek-t/avoid_thundering_herd_on_etcd
Avoid thundering herd of relists on etcd
This commit is contained in:
		| @@ -75,6 +75,9 @@ type Reflector struct { | ||||
| 	ShouldResync func() bool | ||||
| 	// clock allows tests to manipulate time | ||||
| 	clock clock.Clock | ||||
| 	// paginatedResult defines whether pagination should be forced for list calls. | ||||
| 	// It is set based on the result of the initial list call. | ||||
| 	paginatedResult bool | ||||
| 	// lastSyncResourceVersion is the resource version token last | ||||
| 	// observed when doing a sync with the underlying store | ||||
| 	// it is thread safe, but not synchronized with the underlying store | ||||
| @@ -85,7 +88,12 @@ type Reflector struct { | ||||
| 	// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion | ||||
| 	lastSyncResourceVersionMutex sync.RWMutex | ||||
| 	// WatchListPageSize is the requested chunk size of initial and resync watch lists. | ||||
| 	// Defaults to pager.PageSize. | ||||
| 	// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data | ||||
| 	// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") | ||||
| 	// it will turn off pagination to allow serving them from watch cache. | ||||
| 	// NOTE: It should be used carefully as paginated lists are always served directly from | ||||
| 	// etcd, which is significantly less efficient and may lead to serious performance and | ||||
| 	// scalability problems. | ||||
| 	WatchListPageSize int64 | ||||
| } | ||||
|  | ||||
| @@ -204,6 +212,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 		initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) | ||||
| 		defer initTrace.LogIfLong(10 * time.Second) | ||||
| 		var list runtime.Object | ||||
| 		var paginatedResult bool | ||||
| 		var err error | ||||
| 		listCh := make(chan struct{}, 1) | ||||
| 		panicCh := make(chan interface{}, 1) | ||||
| @@ -218,11 +227,30 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { | ||||
| 				return r.listerWatcher.List(opts) | ||||
| 			})) | ||||
| 			if r.WatchListPageSize != 0 { | ||||
| 			switch { | ||||
| 			case r.WatchListPageSize != 0: | ||||
| 				pager.PageSize = r.WatchListPageSize | ||||
| 			case r.paginatedResult: | ||||
| 				// We got a paginated result initially. Assume this resource and server honor | ||||
| 				// paging requests (i.e. watch cache is probably disabled) and leave the default | ||||
| 				// pager size set. | ||||
| 			case options.ResourceVersion != "" && options.ResourceVersion != "0": | ||||
| 				// User didn't explicitly request pagination. | ||||
| 				// | ||||
| 				// With ResourceVersion != "", we have a possibility to list from watch cache, | ||||
| 				// but we do that (for ResourceVersion != "0") only if Limit is unset. | ||||
| 				// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly | ||||
| 				// switch off pagination to force listing from watch cache (if enabled). | ||||
| 				// With the existing semantic of RV (result is at least as fresh as provided RV), | ||||
| 				// this is correct and doesn't lead to going back in time. | ||||
| 				// | ||||
| 				// We also don't turn off pagination for ResourceVersion="0", since watch cache | ||||
| 				// is ignoring Limit in that case anyway, and if watch cache is not enabled | ||||
| 				// we don't introduce regression. | ||||
| 				pager.PageSize = 0 | ||||
| 			} | ||||
|  | ||||
| 			list, err = pager.List(context.Background(), options) | ||||
| 			list, paginatedResult, err = pager.List(context.Background(), options) | ||||
| 			if isExpiredError(err) { | ||||
| 				r.setIsLastSyncResourceVersionExpired(true) | ||||
| 				// Retry immediately if the resource version used to list is expired. | ||||
| @@ -230,7 +258,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 				// continuation pages, but the pager might not be enabled, or the full list might fail because the | ||||
| 				// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all | ||||
| 				// to recover and ensure the reflector makes forward progress. | ||||
| 				list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) | ||||
| 				list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) | ||||
| 			} | ||||
| 			close(listCh) | ||||
| 		}() | ||||
| @@ -244,6 +272,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) | ||||
| 		} | ||||
|  | ||||
| 		// We check if the list was paginated and if so set the paginatedResult based on that. | ||||
| 		// However, we want to do that only for the initial list (which is the only case | ||||
| 		// when we set ResourceVersion="0"). The reasoning behind it is that later, in some | ||||
| 		// situations we may force listing directly from etcd (by setting ResourceVersion="") | ||||
| 		// which will return paginated result, even if watch cache is enabled. However, in | ||||
| 		// that case, we still want to prefer sending requests to watch cache if possible. | ||||
| 		// | ||||
| 		// Paginated result returned for request with ResourceVersion="0" mean that watch | ||||
| 		// cache is disabled and there are a lot of objects of a given type. In such case, | ||||
| 		// there is no need to prefer listing from watch cache. | ||||
| 		if options.ResourceVersion == "0" && paginatedResult { | ||||
| 			r.paginatedResult = true | ||||
| 		} | ||||
|  | ||||
| 		r.setIsLastSyncResourceVersionExpired(false) // list was successful | ||||
| 		initTrace.Step("Objects listed") | ||||
| 		listMetaInterface, err := meta.ListAccessor(list) | ||||
| @@ -320,7 +363,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 		if err != nil { | ||||
| 			switch { | ||||
| 			case isExpiredError(err): | ||||
| 				r.setIsLastSyncResourceVersionExpired(true) | ||||
| 				// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already | ||||
| 				// has a semantic that it returns data at least as fresh as provided RV. | ||||
| 				// So first try to LIST with setting RV to resource version of last observed object. | ||||
| 				klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) | ||||
| 			case err == io.EOF: | ||||
| 				// watch closed normally | ||||
| @@ -344,8 +389,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 			if err != errorStopRequested { | ||||
| 				switch { | ||||
| 				case isExpiredError(err): | ||||
| 					r.setIsLastSyncResourceVersionExpired(true) | ||||
| 					klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) | ||||
| 					// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already | ||||
| 					// has a semantic that it returns data at least as fresh as provided RV. | ||||
| 					// So first try to LIST with setting RV to resource version of last observed object. | ||||
| 					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) | ||||
| 				default: | ||||
| 					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) | ||||
| 				} | ||||
|   | ||||
| @@ -425,6 +425,8 @@ func TestReflectorWatchListPageSize(t *testing.T) { | ||||
| 		}, | ||||
| 	} | ||||
| 	r := NewReflector(lw, &v1.Pod{}, s, 0) | ||||
| 	// Set resource version to test pagination also for not consistent reads. | ||||
| 	r.setLastSyncResourceVersion("10") | ||||
| 	// Set the reflector to paginate the list request in 4 item chunks. | ||||
| 	r.WatchListPageSize = 4 | ||||
| 	r.ListAndWatch(stopCh) | ||||
| @@ -435,6 +437,92 @@ func TestReflectorWatchListPageSize(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) { | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
|  | ||||
| 	lw := &testLW{ | ||||
| 		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | ||||
| 			// Stop once the reflector begins watching since we're only interested in the list. | ||||
| 			close(stopCh) | ||||
| 			fw := watch.NewFake() | ||||
| 			return fw, nil | ||||
| 		}, | ||||
| 		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { | ||||
| 			if options.ResourceVersion != "10" { | ||||
| 				t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion) | ||||
| 			} | ||||
| 			if options.Limit != 0 { | ||||
| 				t.Fatalf("Expected list Limit of 0 but got %d", options.Limit) | ||||
| 			} | ||||
| 			pods := make([]v1.Pod, 10) | ||||
| 			for i := 0; i < 10; i++ { | ||||
| 				pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} | ||||
| 			} | ||||
| 			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil | ||||
| 		}, | ||||
| 	} | ||||
| 	r := NewReflector(lw, &v1.Pod{}, s, 0) | ||||
| 	r.setLastSyncResourceVersion("10") | ||||
| 	r.ListAndWatch(stopCh) | ||||
|  | ||||
| 	results := s.List() | ||||
| 	if len(results) != 10 { | ||||
| 		t.Errorf("Expected 10 results, got %d", len(results)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) { | ||||
| 	var stopCh chan struct{} | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
|  | ||||
| 	lw := &testLW{ | ||||
| 		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | ||||
| 			// Stop once the reflector begins watching since we're only interested in the list. | ||||
| 			close(stopCh) | ||||
| 			fw := watch.NewFake() | ||||
| 			return fw, nil | ||||
| 		}, | ||||
| 		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { | ||||
| 			// Check that default pager limit is set. | ||||
| 			if options.Limit != 500 { | ||||
| 				t.Fatalf("Expected list Limit of 500 but got %d", options.Limit) | ||||
| 			} | ||||
| 			pods := make([]v1.Pod, 10) | ||||
| 			for i := 0; i < 10; i++ { | ||||
| 				pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}} | ||||
| 			} | ||||
| 			switch options.Continue { | ||||
| 			case "": | ||||
| 				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil | ||||
| 			case "C1": | ||||
| 				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil | ||||
| 			case "C2": | ||||
| 				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil | ||||
| 			default: | ||||
| 				t.Fatalf("Unrecognized continue: %s", options.Continue) | ||||
| 			} | ||||
| 			return nil, nil | ||||
| 		}, | ||||
| 	} | ||||
| 	r := NewReflector(lw, &v1.Pod{}, s, 0) | ||||
|  | ||||
| 	// Initial list should initialize paginatedResult in the reflector. | ||||
| 	stopCh = make(chan struct{}) | ||||
| 	r.ListAndWatch(stopCh) | ||||
| 	if results := s.List(); len(results) != 10 { | ||||
| 		t.Errorf("Expected 10 results, got %d", len(results)) | ||||
| 	} | ||||
|  | ||||
| 	// Since initial list for ResourceVersion="0" was paginated, the subsequent | ||||
| 	// ones should also be paginated. | ||||
| 	stopCh = make(chan struct{}) | ||||
| 	r.ListAndWatch(stopCh) | ||||
| 	if results := s.List(); len(results) != 10 { | ||||
| 		t.Errorf("Expected 10 results, got %d", len(results)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends | ||||
| // it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or | ||||
| // etcd that is partitioned and serving older data than the reflector has already processed. | ||||
|   | ||||
| @@ -73,16 +73,18 @@ func New(fn ListPageFunc) *ListPager { | ||||
| // List returns a single list object, but attempts to retrieve smaller chunks from the | ||||
| // server to reduce the impact on the server. If the chunk attempt fails, it will load | ||||
| // the full list instead. The Limit field on options, if unset, will default to the page size. | ||||
| func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { | ||||
| func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) { | ||||
| 	if options.Limit == 0 { | ||||
| 		options.Limit = p.PageSize | ||||
| 	} | ||||
| 	requestedResourceVersion := options.ResourceVersion | ||||
| 	var list *metainternalversion.List | ||||
| 	paginatedResult := false | ||||
|  | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return nil, ctx.Err() | ||||
| 			return nil, paginatedResult, ctx.Err() | ||||
| 		default: | ||||
| 		} | ||||
|  | ||||
| @@ -93,23 +95,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti | ||||
| 			// failing when the resource versions is established by the first page request falls out of the compaction | ||||
| 			// during the subsequent list requests). | ||||
| 			if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" { | ||||
| 				return nil, err | ||||
| 				return nil, paginatedResult, err | ||||
| 			} | ||||
| 			// the list expired while we were processing, fall back to a full list at | ||||
| 			// the requested ResourceVersion. | ||||
| 			options.Limit = 0 | ||||
| 			options.Continue = "" | ||||
| 			options.ResourceVersion = requestedResourceVersion | ||||
| 			return p.PageFn(ctx, options) | ||||
| 			result, err := p.PageFn(ctx, options) | ||||
| 			return result, paginatedResult, err | ||||
| 		} | ||||
| 		m, err := meta.ListAccessor(obj) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("returned object must be a list: %v", err) | ||||
| 			return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		// exit early and return the object we got if we haven't processed any pages | ||||
| 		if len(m.GetContinue()) == 0 && list == nil { | ||||
| 			return obj, nil | ||||
| 			return obj, paginatedResult, nil | ||||
| 		} | ||||
|  | ||||
| 		// initialize the list and fill its contents | ||||
| @@ -122,12 +125,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti | ||||
| 			list.Items = append(list.Items, obj) | ||||
| 			return nil | ||||
| 		}); err != nil { | ||||
| 			return nil, err | ||||
| 			return nil, paginatedResult, err | ||||
| 		} | ||||
|  | ||||
| 		// if we have no more items, return the list | ||||
| 		if len(m.GetContinue()) == 0 { | ||||
| 			return list, nil | ||||
| 			return list, paginatedResult, nil | ||||
| 		} | ||||
|  | ||||
| 		// set the next loop up | ||||
| @@ -136,6 +139,8 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti | ||||
| 		// `specifying resource version is not allowed when using continue` error. | ||||
| 		// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143. | ||||
| 		options.ResourceVersion = "" | ||||
| 		// At this point, result is already paginated. | ||||
| 		paginatedResult = true | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -120,6 +120,7 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options | ||||
| 	} | ||||
| 	return p.PagedList(ctx, options) | ||||
| } | ||||
|  | ||||
| func TestListPager_List(t *testing.T) { | ||||
| 	type fields struct { | ||||
| 		PageSize          int64 | ||||
| @@ -135,6 +136,7 @@ func TestListPager_List(t *testing.T) { | ||||
| 		fields    fields | ||||
| 		args      args | ||||
| 		want      runtime.Object | ||||
| 		wantPaged bool | ||||
| 		wantErr   bool | ||||
| 		isExpired bool | ||||
| 	}{ | ||||
| @@ -143,35 +145,41 @@ func TestListPager_List(t *testing.T) { | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, | ||||
| 			args:      args{}, | ||||
| 			want:      list(0, "rv:20"), | ||||
| 			wantPaged: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "one page", | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, | ||||
| 			args:      args{}, | ||||
| 			want:      list(9, "rv:20"), | ||||
| 			wantPaged: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "one full page", | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, | ||||
| 			args:      args{}, | ||||
| 			want:      list(10, "rv:20"), | ||||
| 			wantPaged: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "two pages", | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, | ||||
| 			args:      args{}, | ||||
| 			want:      list(11, "rv:20"), | ||||
| 			wantPaged: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "three pages", | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, | ||||
| 			args:      args{}, | ||||
| 			want:      list(21, "rv:20"), | ||||
| 			wantPaged: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "expires on second page", | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, | ||||
| 			args:      args{}, | ||||
| 			wantPaged: true, | ||||
| 			wantErr:   true, | ||||
| 			isExpired: true, | ||||
| 		}, | ||||
| @@ -184,12 +192,14 @@ func TestListPager_List(t *testing.T) { | ||||
| 			}, | ||||
| 			args:      args{}, | ||||
| 			want:      list(21, "rv:20"), | ||||
| 			wantPaged: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:      "two pages with resourceVersion", | ||||
| 			fields:    fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, | ||||
| 			args:      args{options: metav1.ListOptions{ResourceVersion: "rv:10"}}, | ||||
| 			want:      list(11, "rv:20"), | ||||
| 			wantPaged: true, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
| @@ -203,7 +213,7 @@ func TestListPager_List(t *testing.T) { | ||||
| 			if ctx == nil { | ||||
| 				ctx = context.Background() | ||||
| 			} | ||||
| 			got, err := p.List(ctx, tt.args.options) | ||||
| 			got, paginatedResult, err := p.List(ctx, tt.args.options) | ||||
| 			if (err != nil) != tt.wantErr { | ||||
| 				t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) | ||||
| 				return | ||||
| @@ -212,6 +222,9 @@ func TestListPager_List(t *testing.T) { | ||||
| 				t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired) | ||||
| 				return | ||||
| 			} | ||||
| 			if tt.wantPaged != paginatedResult { | ||||
| 				t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged) | ||||
| 			} | ||||
| 			if !reflect.DeepEqual(got, tt.want) { | ||||
| 				t.Errorf("ListPager.List() = %v, want %v", got, tt.want) | ||||
| 			} | ||||
|   | ||||
| @@ -304,7 +304,7 @@ func TestListResourceVersion0(t *testing.T) { | ||||
|  | ||||
| 			p := pager.New(pager.SimplePageFunc(pagerFn)) | ||||
| 			p.PageSize = 3 | ||||
| 			listObj, err := p.List(context.Background(), metav1.ListOptions{ResourceVersion: "0"}) | ||||
| 			listObj, _, err := p.List(context.Background(), metav1.ListOptions{ResourceVersion: "0"}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("Unexpected list error: %v", err) | ||||
| 			} | ||||
| @@ -360,7 +360,7 @@ func TestAPIListChunking(t *testing.T) { | ||||
| 			return list, err | ||||
| 		}), | ||||
| 	} | ||||
| 	listObj, err := p.List(context.Background(), metav1.ListOptions{}) | ||||
| 	listObj, _, err := p.List(context.Background(), metav1.ListOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot