mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 12:18:16 +00:00 
			
		
		
		
	Merge pull request #793 from GoogleCloudPlatform/revert-694-atomicetcd
Revert "Use atomic create in EtcdHelper.AtomicUpdate"
This commit is contained in:
		@@ -98,11 +98,6 @@ func IsEtcdTestFailed(err error) bool {
 | 
				
			|||||||
	return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
 | 
						return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error.
 | 
					 | 
				
			||||||
func IsEtcdNodeExist(err error) bool {
 | 
					 | 
				
			||||||
	return isEtcdErrorNum(err, EtcdErrorCodeNodeExist)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
 | 
					// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
 | 
				
			||||||
func IsEtcdWatchStoppedByUser(err error) bool {
 | 
					func IsEtcdWatchStoppedByUser(err error) bool {
 | 
				
			||||||
	return etcd.ErrWatchStoppedByUser == err
 | 
						return etcd.ErrWatchStoppedByUser == err
 | 
				
			||||||
@@ -258,20 +253,15 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
 | 
				
			|||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// First time this key has been used, just set.
 | 
				
			||||||
 | 
							if index == 0 {
 | 
				
			||||||
 | 
								return h.SetObj(key, ret)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		data, err := h.Encoding.Encode(ret)
 | 
							data, err := h.Encoding.Encode(ret)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					 | 
				
			||||||
		// First time this key has been used, try creating new value.
 | 
					 | 
				
			||||||
		if index == 0 {
 | 
					 | 
				
			||||||
			_, err = h.Client.Create(key, string(data), 0)
 | 
					 | 
				
			||||||
			if IsEtcdNodeExist(err) {
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			return err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
 | 
							_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
 | 
				
			||||||
		if IsEtcdTestFailed(err) {
 | 
							if IsEtcdTestFailed(err) {
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -19,7 +19,6 @@ package tools
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
						"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
 | 
				
			||||||
@@ -227,7 +226,7 @@ func TestAtomicUpdate(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Create a new node.
 | 
						// Create a new node.
 | 
				
			||||||
	fakeClient.ExpectNotFoundGet("/some/key")
 | 
						fakeClient.ExpectNotFoundGet("/some/key")
 | 
				
			||||||
	obj := &TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1}
 | 
						obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1}
 | 
				
			||||||
	err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) {
 | 
						err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) {
 | 
				
			||||||
		return obj, nil
 | 
							return obj, nil
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
@@ -243,6 +242,7 @@ func TestAtomicUpdate(t *testing.T) {
 | 
				
			|||||||
	if expect != got {
 | 
						if expect != got {
 | 
				
			||||||
		t.Errorf("Wanted %v, got %v", expect, got)
 | 
							t.Errorf("Wanted %v, got %v", expect, got)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Update an existing node.
 | 
						// Update an existing node.
 | 
				
			||||||
	callbackCalled := false
 | 
						callbackCalled := false
 | 
				
			||||||
@@ -274,57 +274,6 @@ func TestAtomicUpdate(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestAtomicUpdate_CreateCollision(t *testing.T) {
 | 
					 | 
				
			||||||
	fakeClient := MakeFakeEtcdClient(t)
 | 
					 | 
				
			||||||
	fakeClient.TestIndex = true
 | 
					 | 
				
			||||||
	encoding := scheme
 | 
					 | 
				
			||||||
	helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	fakeClient.ExpectNotFoundGet("/some/key")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	const concurrency = 10
 | 
					 | 
				
			||||||
	var wgDone sync.WaitGroup
 | 
					 | 
				
			||||||
	var wgForceCollision sync.WaitGroup
 | 
					 | 
				
			||||||
	wgDone.Add(concurrency)
 | 
					 | 
				
			||||||
	wgForceCollision.Add(concurrency)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for i := 0; i < concurrency; i++ {
 | 
					 | 
				
			||||||
		// Increment TestResource.Value by 1
 | 
					 | 
				
			||||||
		go func() {
 | 
					 | 
				
			||||||
			defer wgDone.Done()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			firstCall := true
 | 
					 | 
				
			||||||
			err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) {
 | 
					 | 
				
			||||||
				defer func() { firstCall = false }()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				if firstCall {
 | 
					 | 
				
			||||||
					// Force collision by joining all concurrent AtomicUpdate operations here.
 | 
					 | 
				
			||||||
					wgForceCollision.Done()
 | 
					 | 
				
			||||||
					wgForceCollision.Wait()
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				currValue := in.(*TestResource).Value
 | 
					 | 
				
			||||||
				obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: currValue + 1}
 | 
					 | 
				
			||||||
				return obj, nil
 | 
					 | 
				
			||||||
			})
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				t.Errorf("Unexpected error %#v", err)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	wgDone.Wait()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Check that stored TestResource has received all updates.
 | 
					 | 
				
			||||||
	body := fakeClient.Data["/some/key"].R.Node.Value
 | 
					 | 
				
			||||||
	stored := &TestResource{}
 | 
					 | 
				
			||||||
	if err := encoding.DecodeInto([]byte(body), stored); err != nil {
 | 
					 | 
				
			||||||
		t.Errorf("Error decoding stored value: %v", body)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if stored.Value != concurrency {
 | 
					 | 
				
			||||||
		t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestWatchInterpretation_ListAdd(t *testing.T) {
 | 
					func TestWatchInterpretation_ListAdd(t *testing.T) {
 | 
				
			||||||
	w := newEtcdWatcher(true, func(interface{}) bool {
 | 
						w := newEtcdWatcher(true, func(interface{}) bool {
 | 
				
			||||||
		t.Errorf("unexpected filter call")
 | 
							t.Errorf("unexpected filter call")
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -19,7 +19,6 @@ package tools
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/coreos/go-etcd/etcd"
 | 
						"github.com/coreos/go-etcd/etcd"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -41,7 +40,6 @@ type FakeEtcdClient struct {
 | 
				
			|||||||
	Data                 map[string]EtcdResponseWithError
 | 
						Data                 map[string]EtcdResponseWithError
 | 
				
			||||||
	DeletedKeys          []string
 | 
						DeletedKeys          []string
 | 
				
			||||||
	expectNotFoundGetSet map[string]struct{}
 | 
						expectNotFoundGetSet map[string]struct{}
 | 
				
			||||||
	sync.Mutex
 | 
					 | 
				
			||||||
	Err                  error
 | 
						Err                  error
 | 
				
			||||||
	t                    TestLogger
 | 
						t                    TestLogger
 | 
				
			||||||
	Ix                   int
 | 
						Ix                   int
 | 
				
			||||||
@@ -91,17 +89,11 @@ func (f *FakeEtcdClient) generateIndex() uint64 {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
 | 
					func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
 | 
				
			||||||
	f.Mutex.Lock()
 | 
					 | 
				
			||||||
	defer f.Mutex.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	f.Ix = f.Ix + 1
 | 
						f.Ix = f.Ix + 1
 | 
				
			||||||
	return f.setLocked(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl)
 | 
						return f.Set(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
 | 
					func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
 | 
				
			||||||
	f.Mutex.Lock()
 | 
					 | 
				
			||||||
	defer f.Mutex.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	result := f.Data[key]
 | 
						result := f.Data[key]
 | 
				
			||||||
	if result.R == nil {
 | 
						if result.R == nil {
 | 
				
			||||||
		if _, ok := f.expectNotFoundGetSet[key]; !ok {
 | 
							if _, ok := f.expectNotFoundGetSet[key]; !ok {
 | 
				
			||||||
@@ -118,7 +110,7 @@ func (f *FakeEtcdClient) nodeExists(key string) bool {
 | 
				
			|||||||
	return ok && result.R != nil && result.R.Node != nil
 | 
						return ok && result.R != nil && result.R.Node != nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Response, error) {
 | 
					func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
 | 
				
			||||||
	if f.Err != nil {
 | 
						if f.Err != nil {
 | 
				
			||||||
		return nil, f.Err
 | 
							return nil, f.Err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -154,13 +146,6 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
 | 
				
			|||||||
	return result.R, nil
 | 
						return result.R, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
 | 
					 | 
				
			||||||
	f.Mutex.Lock()
 | 
					 | 
				
			||||||
	defer f.Mutex.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return f.setLocked(key, value, ttl)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) {
 | 
					func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) {
 | 
				
			||||||
	if f.Err != nil {
 | 
						if f.Err != nil {
 | 
				
			||||||
		return nil, f.Err
 | 
							return nil, f.Err
 | 
				
			||||||
@@ -175,9 +160,6 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue
 | 
				
			|||||||
		return nil, errors.New("Either prevValue or prevIndex must be specified.")
 | 
							return nil, errors.New("Either prevValue or prevIndex must be specified.")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	f.Mutex.Lock()
 | 
					 | 
				
			||||||
	defer f.Mutex.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if !f.nodeExists(key) {
 | 
						if !f.nodeExists(key) {
 | 
				
			||||||
		return nil, EtcdErrorNotFound
 | 
							return nil, EtcdErrorNotFound
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -192,18 +174,15 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue
 | 
				
			|||||||
		return nil, EtcdErrorTestFailed
 | 
							return nil, EtcdErrorTestFailed
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return f.setLocked(key, value, ttl)
 | 
						return f.Set(key, value, ttl)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
 | 
					func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
 | 
				
			||||||
	f.Mutex.Lock()
 | 
					 | 
				
			||||||
	defer f.Mutex.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if f.nodeExists(key) {
 | 
						if f.nodeExists(key) {
 | 
				
			||||||
		return nil, EtcdErrorNodeExist
 | 
							return nil, EtcdErrorNodeExist
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return f.setLocked(key, value, ttl)
 | 
						return f.Set(key, value, ttl)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
 | 
					func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user