mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-03 19:58:17 +00:00 
			
		
		
		
	storage/etcd3: error when progressNotify option set and newFunc was provided for a registry
This commit is contained in:
		@@ -18,6 +18,7 @@ package etcd3
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"strconv"
 | 
			
		||||
@@ -97,6 +98,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage
 | 
			
		||||
	if opts.Recursive && !strings.HasSuffix(key, "/") {
 | 
			
		||||
		key += "/"
 | 
			
		||||
	}
 | 
			
		||||
	if opts.ProgressNotify && w.newFunc == nil {
 | 
			
		||||
		return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
 | 
			
		||||
	}
 | 
			
		||||
	wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate)
 | 
			
		||||
	go wc.run()
 | 
			
		||||
 | 
			
		||||
@@ -334,9 +338,6 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
 | 
			
		||||
 | 
			
		||||
	switch {
 | 
			
		||||
	case e.isProgressNotify:
 | 
			
		||||
		if wc.watcher.newFunc == nil {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
		object := wc.watcher.newFunc()
 | 
			
		||||
		if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
 | 
			
		||||
			klog.Errorf("failed to propagate object version: %v", err)
 | 
			
		||||
 
 | 
			
		||||
@@ -18,11 +18,14 @@ package etcd3
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/etcd3/testserver"
 | 
			
		||||
@@ -133,3 +136,25 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
 | 
			
		||||
	cancel()
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestWatchErrorWhenNoNewFunc checks if an error
 | 
			
		||||
// will be returned when establishing a watch
 | 
			
		||||
// with progressNotify options set
 | 
			
		||||
// when newFunc wasn't provided
 | 
			
		||||
func TestWatchErrorWhenNoNewFunc(t *testing.T) {
 | 
			
		||||
	origCtx, store, _ := testSetup(t, func(opts *setupOptions) { opts.newFunc = nil })
 | 
			
		||||
	ctx, cancel := context.WithCancel(origCtx)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	w, err := store.watcher.Watch(ctx, "/abc", 0, storage.ListOptions{ProgressNotify: true})
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		t.Fatalf("expected an error but got none")
 | 
			
		||||
	}
 | 
			
		||||
	if w != nil {
 | 
			
		||||
		t.Fatalf("didn't expect a watcher because progress notifications cannot be delivered for a watcher without newFunc")
 | 
			
		||||
	}
 | 
			
		||||
	expectedError := apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
 | 
			
		||||
	if err.Error() != expectedError.Error() {
 | 
			
		||||
		t.Fatalf("unexpected err = %v, expected = %v", err, expectedError)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user