mirror of
				https://github.com/optim-enterprises-bv/kubernetes.git
				synced 2025-11-04 04:08:16 +00:00 
			
		
		
		
	Accept Quorum parameter in etcd3.
This commit is contained in:
		@@ -38,7 +38,10 @@ import (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type store struct {
 | 
					type store struct {
 | 
				
			||||||
	client     *clientv3.Client
 | 
						client *clientv3.Client
 | 
				
			||||||
 | 
						// getOpts contains additional options that should be passed
 | 
				
			||||||
 | 
						// to all Get() calls.
 | 
				
			||||||
 | 
						getOps     []clientv3.OpOption
 | 
				
			||||||
	codec      runtime.Codec
 | 
						codec      runtime.Codec
 | 
				
			||||||
	versioner  storage.Versioner
 | 
						versioner  storage.Versioner
 | 
				
			||||||
	pathPrefix string
 | 
						pathPrefix string
 | 
				
			||||||
@@ -59,18 +62,30 @@ type objState struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// New returns an etcd3 implementation of storage.Interface.
 | 
					// New returns an etcd3 implementation of storage.Interface.
 | 
				
			||||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
 | 
					func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
 | 
				
			||||||
	return newStore(c, codec, prefix)
 | 
						return newStore(c, true, codec, prefix)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store {
 | 
					// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
 | 
				
			||||||
 | 
					// where Get operations don't require quorum read.
 | 
				
			||||||
 | 
					func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
 | 
				
			||||||
 | 
						return newStore(c, false, codec, prefix)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string) *store {
 | 
				
			||||||
	versioner := etcd.APIObjectVersioner{}
 | 
						versioner := etcd.APIObjectVersioner{}
 | 
				
			||||||
	return &store{
 | 
						result := &store{
 | 
				
			||||||
		client:     c,
 | 
							client:     c,
 | 
				
			||||||
		versioner:  versioner,
 | 
							versioner:  versioner,
 | 
				
			||||||
		codec:      codec,
 | 
							codec:      codec,
 | 
				
			||||||
		pathPrefix: prefix,
 | 
							pathPrefix: prefix,
 | 
				
			||||||
		watcher:    newWatcher(c, codec, versioner),
 | 
							watcher:    newWatcher(c, codec, versioner),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if !quorumRead {
 | 
				
			||||||
 | 
							// In case of non-quorum reads, we can set WithSerializable()
 | 
				
			||||||
 | 
							// options for all Get operations.
 | 
				
			||||||
 | 
							result.getOps = append(result.getOps, clientv3.WithSerializable())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return result
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Versioner implements storage.Interface.Versioner.
 | 
					// Versioner implements storage.Interface.Versioner.
 | 
				
			||||||
@@ -81,7 +96,7 @@ func (s *store) Versioner() storage.Versioner {
 | 
				
			|||||||
// Get implements storage.Interface.Get.
 | 
					// Get implements storage.Interface.Get.
 | 
				
			||||||
func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error {
 | 
					func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error {
 | 
				
			||||||
	key = keyWithPrefix(s.pathPrefix, key)
 | 
						key = keyWithPrefix(s.pathPrefix, key)
 | 
				
			||||||
	getResp, err := s.client.KV.Get(ctx, key)
 | 
						getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -202,7 +217,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
 | 
				
			|||||||
		panic("unable to convert output object to pointer")
 | 
							panic("unable to convert output object to pointer")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	key = keyWithPrefix(s.pathPrefix, key)
 | 
						key = keyWithPrefix(s.pathPrefix, key)
 | 
				
			||||||
	getResp, err := s.client.KV.Get(ctx, key)
 | 
						getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -262,7 +277,7 @@ func (s *store) GetToList(ctx context.Context, key string, pred storage.Selectio
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	key = keyWithPrefix(s.pathPrefix, key)
 | 
						key = keyWithPrefix(s.pathPrefix, key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	getResp, err := s.client.KV.Get(ctx, key)
 | 
						getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -451,7 +451,7 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
 | 
				
			|||||||
func TestList(t *testing.T) {
 | 
					func TestList(t *testing.T) {
 | 
				
			||||||
	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | 
						cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | 
				
			||||||
	defer cluster.Terminate(t)
 | 
						defer cluster.Terminate(t)
 | 
				
			||||||
	store := newStore(cluster.RandClient(), testapi.Default.Codec(), "")
 | 
						store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "")
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Setup storage with the following structure:
 | 
						// Setup storage with the following structure:
 | 
				
			||||||
@@ -538,7 +538,7 @@ func TestList(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
 | 
					func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
 | 
				
			||||||
	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | 
						cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | 
				
			||||||
	store := newStore(cluster.RandClient(), testapi.Default.Codec(), "")
 | 
						store := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "")
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
	return ctx, store, cluster
 | 
						return ctx, store, cluster
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -158,7 +158,7 @@ func (wc *watchChan) sync() error {
 | 
				
			|||||||
	wc.initialRev = getResp.Header.Revision
 | 
						wc.initialRev = getResp.Header.Revision
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, kv := range getResp.Kvs {
 | 
						for _, kv := range getResp.Kvs {
 | 
				
			||||||
		prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1))
 | 
							prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1), clientv3.WithSerializable())
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -176,13 +176,13 @@ func TestWatchFromNoneZero(t *testing.T) {
 | 
				
			|||||||
func TestWatchError(t *testing.T) {
 | 
					func TestWatchError(t *testing.T) {
 | 
				
			||||||
	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | 
						cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 | 
				
			||||||
	defer cluster.Terminate(t)
 | 
						defer cluster.Terminate(t)
 | 
				
			||||||
	invalidStore := newStore(cluster.RandClient(), &testCodec{testapi.Default.Codec()}, "")
 | 
						invalidStore := newStore(cluster.RandClient(), false, &testCodec{testapi.Default.Codec()}, "")
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
	w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
 | 
						w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("Watch failed: %v", err)
 | 
							t.Fatalf("Watch failed: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	validStore := newStore(cluster.RandClient(), testapi.Default.Codec(), "")
 | 
						validStore := newStore(cluster.RandClient(), false, testapi.Default.Codec(), "")
 | 
				
			||||||
	validStore.GuaranteedUpdate(ctx, "/abc", &api.Pod{}, true, nil, storage.SimpleUpdate(
 | 
						validStore.GuaranteedUpdate(ctx, "/abc", &api.Pod{}, true, nil, storage.SimpleUpdate(
 | 
				
			||||||
		func(runtime.Object) (runtime.Object, error) {
 | 
							func(runtime.Object) (runtime.Object, error) {
 | 
				
			||||||
			return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil
 | 
								return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -55,5 +55,8 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
 | 
				
			|||||||
		cancel()
 | 
							cancel()
 | 
				
			||||||
		client.Close()
 | 
							client.Close()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil
 | 
						if c.Quorum {
 | 
				
			||||||
 | 
							return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix), destroyFunc, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user