mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Merge pull request #7516 from GoogleCloudPlatform/revert-7288-perf
Revert "Add a simple cache for objects stored in etcd"
This commit is contained in:
		@@ -31,8 +31,7 @@ func TestGetServersToValidate(t *testing.T) {
 | 
			
		||||
	config := Config{}
 | 
			
		||||
	fakeClient := tools.NewFakeEtcdClient(t)
 | 
			
		||||
	fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
 | 
			
		||||
	config.EtcdHelper = tools.NewEtcdHelper(fakeClient, latest.Codec, etcdtest.PathPrefix())
 | 
			
		||||
	config.EtcdHelper.Versioner = nil
 | 
			
		||||
	config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil, etcdtest.PathPrefix()}
 | 
			
		||||
 | 
			
		||||
	master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -26,7 +26,6 @@ import (
 | 
			
		||||
	"path"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
 | 
			
		||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
 | 
			
		||||
@@ -43,16 +42,6 @@ type EtcdHelper struct {
 | 
			
		||||
	Versioner EtcdVersioner
 | 
			
		||||
	// prefix for all etcd keys
 | 
			
		||||
	PathPrefix string
 | 
			
		||||
 | 
			
		||||
	// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
 | 
			
		||||
	// to resourceVersion.
 | 
			
		||||
	// This depends on etcd's indexes being globally unique across all objects/types. This will
 | 
			
		||||
	// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
 | 
			
		||||
	// support multi-object transaction that will result in many objects with the same index.
 | 
			
		||||
	// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
 | 
			
		||||
	// TODO: Measure how much this cache helps after the conversion code is optimized.
 | 
			
		||||
	cache map[uint64]runtime.Object
 | 
			
		||||
	mutex sync.RWMutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewEtcdHelper creates a helper that works against objects that use the internal
 | 
			
		||||
@@ -63,7 +52,6 @@ func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHe
 | 
			
		||||
		Codec:      codec,
 | 
			
		||||
		Versioner:  APIObjectVersioner{},
 | 
			
		||||
		PathPrefix: prefix,
 | 
			
		||||
		cache:      make(map[uint64]runtime.Object),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -133,9 +121,6 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
 | 
			
		||||
			}
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if obj, found := h.getFromCache(node.ModifiedIndex); found {
 | 
			
		||||
			v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
 | 
			
		||||
		} else {
 | 
			
		||||
		obj := reflect.New(v.Type().Elem())
 | 
			
		||||
		if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
@@ -145,62 +130,10 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
 | 
			
		||||
			_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
 | 
			
		||||
		}
 | 
			
		||||
		v.Set(reflect.Append(v, obj.Elem()))
 | 
			
		||||
			if node.ModifiedIndex != 0 {
 | 
			
		||||
				h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
 | 
			
		||||
// their Node.ModifiedIndex, which is unique across all types.
 | 
			
		||||
// All implementations must be thread-safe.
 | 
			
		||||
type etcdCache interface {
 | 
			
		||||
	getFromCache(index uint64) (runtime.Object, bool)
 | 
			
		||||
	addToCache(index uint64, obj runtime.Object)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const maxEtcdCacheEntries int = 50000
 | 
			
		||||
 | 
			
		||||
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
 | 
			
		||||
	var obj runtime.Object
 | 
			
		||||
	func() {
 | 
			
		||||
		h.mutex.RLock()
 | 
			
		||||
		defer h.mutex.RUnlock()
 | 
			
		||||
		obj = h.cache[index]
 | 
			
		||||
	}()
 | 
			
		||||
	if obj != nil {
 | 
			
		||||
		// We should not return the object itself to avoid poluting the cache if someone
 | 
			
		||||
		// modifies returned values.
 | 
			
		||||
		objCopy, err := conversion.DeepCopy(obj)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Errorf("Error during DeepCopy of cached object: %q", err)
 | 
			
		||||
			return nil, false
 | 
			
		||||
		}
 | 
			
		||||
		return objCopy.(runtime.Object), true
 | 
			
		||||
	}
 | 
			
		||||
	return nil, false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
 | 
			
		||||
	objCopy, err := conversion.DeepCopy(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Error during DeepCopy of cached object: %q", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	h.mutex.Lock()
 | 
			
		||||
	defer h.mutex.Unlock()
 | 
			
		||||
	h.cache[index] = objCopy.(runtime.Object)
 | 
			
		||||
	if len(h.cache) > maxEtcdCacheEntries {
 | 
			
		||||
		var randomKey uint64
 | 
			
		||||
		for randomKey = range h.cache {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		delete(h.cache, randomKey)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
 | 
			
		||||
// definition) and extracts a go object per etcd node into a slice with the resource version.
 | 
			
		||||
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
 | 
			
		||||
 
 | 
			
		||||
@@ -174,7 +174,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
 | 
			
		||||
								Key:           "/baz",
 | 
			
		||||
								Value:         getEncodedPod("baz"),
 | 
			
		||||
								Dir:           false,
 | 
			
		||||
								ModifiedIndex: 3,
 | 
			
		||||
								ModifiedIndex: 1,
 | 
			
		||||
							},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
@@ -199,7 +199,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
 | 
			
		||||
		Items: []api.Pod{
 | 
			
		||||
			// We expect list to be sorted by directory (e.g. namespace) first, then by name.
 | 
			
		||||
			{
 | 
			
		||||
				ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "3"},
 | 
			
		||||
				ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "1"},
 | 
			
		||||
				Spec: api.PodSpec{
 | 
			
		||||
					RestartPolicy: api.RestartPolicyAlways,
 | 
			
		||||
					DNSPolicy:     api.DNSClusterFirst,
 | 
			
		||||
@@ -482,8 +482,7 @@ func TestSetObjWithVersion(t *testing.T) {
 | 
			
		||||
func TestSetObjWithoutResourceVersioner(t *testing.T) {
 | 
			
		||||
	obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
 | 
			
		||||
	fakeClient := NewFakeEtcdClient(t)
 | 
			
		||||
	helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
 | 
			
		||||
	helper.Versioner = nil
 | 
			
		||||
	helper := EtcdHelper{fakeClient, testapi.Codec(), nil, etcdtest.PathPrefix()}
 | 
			
		||||
	returnedObj := &api.Pod{}
 | 
			
		||||
	err := helper.SetObj("/some/key", obj, returnedObj, 3)
 | 
			
		||||
	key := etcdtest.AddPrefix("/some/key")
 | 
			
		||||
@@ -510,8 +509,7 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
 | 
			
		||||
func TestSetObjNilOutParam(t *testing.T) {
 | 
			
		||||
	obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
 | 
			
		||||
	fakeClient := NewFakeEtcdClient(t)
 | 
			
		||||
	helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
 | 
			
		||||
	helper.Versioner = nil
 | 
			
		||||
	helper := EtcdHelper{fakeClient, testapi.Codec(), nil, etcdtest.PathPrefix()}
 | 
			
		||||
	err := helper.SetObj("/some/key", obj, nil, 3)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("Unexpected error %#v", err)
 | 
			
		||||
 
 | 
			
		||||
@@ -72,7 +72,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
 | 
			
		||||
// watching (e.g., for reconnecting without missing any updates).
 | 
			
		||||
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
 | 
			
		||||
	key = h.PrefixEtcdKey(key)
 | 
			
		||||
	w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h)
 | 
			
		||||
	w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil)
 | 
			
		||||
	go w.etcdWatch(h.Client, key, resourceVersion)
 | 
			
		||||
	return w, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -82,7 +82,7 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
 | 
			
		||||
// Errors will be sent down the channel.
 | 
			
		||||
func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
 | 
			
		||||
	key = h.PrefixEtcdKey(key)
 | 
			
		||||
	w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h)
 | 
			
		||||
	w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil)
 | 
			
		||||
	go w.etcdWatch(h.Client, key, resourceVersion)
 | 
			
		||||
	return w, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -105,7 +105,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc
 | 
			
		||||
// Errors will be sent down the channel.
 | 
			
		||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
 | 
			
		||||
	key = h.PrefixEtcdKey(key)
 | 
			
		||||
	w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h)
 | 
			
		||||
	w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform)
 | 
			
		||||
	go w.etcdWatch(h.Client, key, resourceVersion)
 | 
			
		||||
	return w
 | 
			
		||||
}
 | 
			
		||||
@@ -145,8 +145,6 @@ type etcdWatcher struct {
 | 
			
		||||
 | 
			
		||||
	// Injectable for testing. Send the event down the outgoing channel.
 | 
			
		||||
	emit func(watch.Event)
 | 
			
		||||
 | 
			
		||||
	cache etcdCache
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// watchWaitDuration is the amount of time to wait for an error from watch.
 | 
			
		||||
@@ -154,7 +152,7 @@ const watchWaitDuration = 100 * time.Millisecond
 | 
			
		||||
 | 
			
		||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.  If you provide a transform
 | 
			
		||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
 | 
			
		||||
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
 | 
			
		||||
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc) *etcdWatcher {
 | 
			
		||||
	w := &etcdWatcher{
 | 
			
		||||
		encoding:     encoding,
 | 
			
		||||
		versioner:    versioner,
 | 
			
		||||
@@ -167,7 +165,6 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding
 | 
			
		||||
		etcdStop:     make(chan bool),
 | 
			
		||||
		outgoing:     make(chan watch.Event),
 | 
			
		||||
		userStop:     make(chan struct{}),
 | 
			
		||||
		cache:        cache,
 | 
			
		||||
	}
 | 
			
		||||
	w.emit = func(e watch.Event) { w.outgoing <- e }
 | 
			
		||||
	go w.translate()
 | 
			
		||||
@@ -259,10 +256,6 @@ func (w *etcdWatcher) translate() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
 | 
			
		||||
	if obj, found := w.cache.getFromCache(node.ModifiedIndex); found {
 | 
			
		||||
		return obj, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	obj, err := w.encoding.Decode([]byte(node.Value))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
@@ -284,9 +277,6 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if node.ModifiedIndex != 0 {
 | 
			
		||||
		w.cache.addToCache(node.ModifiedIndex, obj)
 | 
			
		||||
	}
 | 
			
		||||
	return obj, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -32,18 +32,6 @@ import (
 | 
			
		||||
 | 
			
		||||
var versioner = APIObjectVersioner{}
 | 
			
		||||
 | 
			
		||||
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
 | 
			
		||||
type fakeEtcdCache struct{}
 | 
			
		||||
 | 
			
		||||
func (f *fakeEtcdCache) getFromCache(index uint64) (runtime.Object, bool) {
 | 
			
		||||
	return nil, false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ etcdCache = &fakeEtcdCache{}
 | 
			
		||||
 | 
			
		||||
func TestWatchInterpretations(t *testing.T) {
 | 
			
		||||
	codec := latest.Codec
 | 
			
		||||
	// Declare some pods to make the test cases compact.
 | 
			
		||||
@@ -127,7 +115,7 @@ func TestWatchInterpretations(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	for name, item := range table {
 | 
			
		||||
		for _, action := range item.actions {
 | 
			
		||||
			w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
 | 
			
		||||
			w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil)
 | 
			
		||||
			emitCalled := false
 | 
			
		||||
			w.emit = func(event watch.Event) {
 | 
			
		||||
				emitCalled = true
 | 
			
		||||
@@ -165,7 +153,7 @@ func TestWatchInterpretations(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
 | 
			
		||||
	w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
 | 
			
		||||
	w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
 | 
			
		||||
	w.emit = func(e watch.Event) {
 | 
			
		||||
		t.Errorf("Unexpected emit: %v", e)
 | 
			
		||||
	}
 | 
			
		||||
@@ -179,7 +167,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
 | 
			
		||||
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
 | 
			
		||||
	actions := []string{"create", "set", "compareAndSwap", "delete"}
 | 
			
		||||
	for _, action := range actions {
 | 
			
		||||
		w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
 | 
			
		||||
		w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
 | 
			
		||||
		w.emit = func(e watch.Event) {
 | 
			
		||||
			t.Errorf("Unexpected emit: %v", e)
 | 
			
		||||
		}
 | 
			
		||||
@@ -193,7 +181,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
 | 
			
		||||
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
 | 
			
		||||
	actions := []string{"create", "set", "compareAndSwap", "delete"}
 | 
			
		||||
	for _, action := range actions {
 | 
			
		||||
		w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
 | 
			
		||||
		w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
 | 
			
		||||
		w.emit = func(e watch.Event) {
 | 
			
		||||
			t.Errorf("Unexpected emit: %v", e)
 | 
			
		||||
		}
 | 
			
		||||
@@ -218,7 +206,7 @@ func TestWatchEtcdError(t *testing.T) {
 | 
			
		||||
	fakeClient := NewFakeEtcdClient(t)
 | 
			
		||||
	fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
 | 
			
		||||
	fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
 | 
			
		||||
	h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
	h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
 | 
			
		||||
	watching, err := h.Watch("/some/key", 4, Everything)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -248,7 +236,7 @@ func TestWatch(t *testing.T) {
 | 
			
		||||
	key := "/some/key"
 | 
			
		||||
	prefixedKey := etcdtest.AddPrefix(key)
 | 
			
		||||
	fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
 | 
			
		||||
	h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
	h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
 | 
			
		||||
	watching, err := h.Watch(key, 0, Everything)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -424,7 +412,7 @@ func TestWatchEtcdState(t *testing.T) {
 | 
			
		||||
			fakeClient.Data[key] = value
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
		h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
		watching, err := h.Watch(baseKey, testCase.From, Everything)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
@@ -497,7 +485,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
 | 
			
		||||
		key := "/some/key"
 | 
			
		||||
		prefixedKey := etcdtest.AddPrefix(key)
 | 
			
		||||
		fakeClient.Data[prefixedKey] = testCase.Response
 | 
			
		||||
		h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
		h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
 | 
			
		||||
		watching, err := h.Watch(key, 0, Everything)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@@ -558,7 +546,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
 | 
			
		||||
			EtcdIndex: 3,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
	h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
 | 
			
		||||
	watching, err := h.WatchList(key, 0, Everything)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -598,7 +586,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
 | 
			
		||||
	prefixedKey := etcdtest.AddPrefix(key)
 | 
			
		||||
 | 
			
		||||
	fakeClient := NewFakeEtcdClient(t)
 | 
			
		||||
	h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
	h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
 | 
			
		||||
	watching, err := h.WatchList(key, 1, Everything)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -651,7 +639,7 @@ func TestWatchFromNotFound(t *testing.T) {
 | 
			
		||||
			ErrorCode: 100,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
	h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
 | 
			
		||||
	watching, err := h.Watch(key, 0, Everything)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -678,8 +666,7 @@ func TestWatchFromOtherError(t *testing.T) {
 | 
			
		||||
			ErrorCode: 101,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
 | 
			
		||||
	h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
	watching, err := h.Watch(key, 0, Everything)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Unexpected error: %v", err)
 | 
			
		||||
@@ -709,8 +696,7 @@ func TestWatchFromOtherError(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestWatchPurposefulShutdown(t *testing.T) {
 | 
			
		||||
	fakeClient := NewFakeEtcdClient(t)
 | 
			
		||||
 | 
			
		||||
	h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
 | 
			
		||||
	h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
 | 
			
		||||
	key := "/some/key"
 | 
			
		||||
	prefixedKey := etcdtest.AddPrefix(key)
 | 
			
		||||
	fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user