etcd HA + tests

This commit is contained in:
Eric Buth
2015-05-31 15:02:11 -04:00
parent 669686f654
commit 2e17df9b4b
2 changed files with 204 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ import (
"errors"
"path/filepath"
"strings"
"sync"
"time"
"github.com/armon/go-metrics"
@@ -18,12 +19,23 @@ const (
// entries from directory listings.
EtcdNodeFilePrefix = "."
// The lock prefix can (and probably should) cause an entry to be excluded
// from diretory listings, so "_" works here.
EtcdNodeLockPrefix = "_"
// The delimiter is the same as the `-C` flag of etcdctl.
EtcdMachineDelimiter = ","
// The lock TTL matches the default that Consul API uses, 15 seconds.
EtcdLockTTL = uint64(15)
)
var (
EtcdSyncClusterError = errors.New("client setup failed: unable to sync etcd cluster")
EtcdSyncClusterError = errors.New("client setup failed: unable to sync etcd cluster")
EtcdSemaphoreKeysEmptyError = errors.New("lock queue is empty")
EtcdLockHeldError = errors.New("lock already held")
EtcdLockNotHeldError = errors.New("lock not held")
EtcdSemaphoreKeyRemovedError = errors.New("semaphore key removed before lock aquisition")
)
// errorIsMissingKey returns true if the given error is an etcd error with an
@@ -167,3 +179,188 @@ func (b *EtcdBackend) nodePath(key string) string {
func (b *EtcdBackend) nodePathDir(key string) string {
return filepath.Join(b.path, key) + "/"
}
// nodePathLock returns an etcd directory path used specifically for semaphore
// indicies based on the given key.
func (b *EtcdBackend) nodePathLock(key string) string {
return filepath.Join(b.path, filepath.Dir(key), EtcdNodeLockPrefix+filepath.Base(key)+"/")
}
// Lock is used for mutual exclusion based on the given key.
func (c *EtcdBackend) LockWith(key, value string) (Lock, error) {
return &EtcdLock{
backend: c,
key: key,
value: value,
}, nil
}
// EtcdLock emplements a lock using and etcd backend.
type EtcdLock struct {
backend *EtcdBackend
key, value, semaphoreKey string
lock sync.Mutex
isHeld bool
done chan struct{}
}
// addSemaphoreKey aquires a new ordered semaphore key.
func (c *EtcdLock) addSemaphoreKey() (string, uint64, error) {
// CreateInOrder is an atomic operation that can be used to enqueue a
// request onto a semaphore. In the rest of the comments, we refer to the
// resulting key as a "semaphore key".
// https://coreos.com/etcd/docs/2.0.8/api.html#atomically-creating-in-order-keys
response, err := c.backend.client.CreateInOrder(c.backend.nodePathLock(c.key), "Vault Lock", EtcdLockTTL)
if err != nil {
return "", 0, err
}
return response.Node.Key, response.EtcdIndex, nil
}
// getSemaphoreKey determines which semaphore key holder has aquired the lock.
func (c *EtcdLock) getSemaphoreKey() (string, uint64, error) {
// Get the list of waiters in order to see if we are next.
response, err := c.backend.client.Get(c.backend.nodePathLock(c.key), true, false)
if err != nil {
return "", 0, err
}
// Make sure the list isn't empty.
if response.Node.Nodes.Len() == 0 {
return "", response.EtcdIndex, nil
}
return response.Node.Nodes[0].Key, response.EtcdIndex, nil
}
func (c *EtcdLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Get the local lock before interacting with etcd.
c.lock.Lock()
defer c.lock.Unlock()
// Check if the lock is already held.
if c.isHeld {
return nil, EtcdLockHeldError
}
// Add a new semaphore key that we will track.
semaphoreKey, _, err := c.addSemaphoreKey()
if err != nil {
return nil, err
}
c.semaphoreKey = semaphoreKey
// Get the current semaphore key.
currentSemaphoreKey, currentEtcdIndex, err := c.getSemaphoreKey()
if err != nil {
return nil, err
}
// Create an etcd-compatible boolean stop channel from the provided
// interface stop channel.
boolStopCh := make(chan bool)
go func() {
for _ = range stopCh {
}
close(boolStopCh)
}()
// Loop until the we current semaphore key matches ours.
for semaphoreKey != currentSemaphoreKey {
var err error
// Start a watch of the entire lock directory, providing the stop channel.
response, err := c.backend.client.Watch(c.backend.nodePathLock(c.key), currentEtcdIndex+1, true, nil, boolStopCh)
if err != nil {
// If the error is not an etcd error, we can assume it's a notification
// of the stop channel having closed. In this scenario, we also want to
// remove our semaphore key as we are no longer waiting to aquire the
// lock.
if _, ok := err.(*etcd.EtcdError); !ok {
_, err = c.backend.client.Delete(c.semaphoreKey, false)
}
return nil, err
}
// Make sure the index we are waiting for has not been removed. If it has,
// this is an error and nothing else needs to be done.
if response.Node.Key == semaphoreKey &&
(response.Action == "delete" || response.Action == "expire") {
return nil, EtcdSemaphoreKeyRemovedError
}
// Get the current semaphore key and etcd index.
currentSemaphoreKey, currentEtcdIndex, err = c.getSemaphoreKey()
if err != nil {
return nil, err
}
}
// We now hold the lock.
c.isHeld = true
// Because we hold the lock, we can set our value for the given key.
if err := c.backend.Put(&Entry{
Key: c.key,
Value: []byte(c.value),
}); err != nil {
return nil, err
}
// Create a channel to signal when we lose the lock.
c.done = make(chan struct{})
return c.done, nil
}
func (c *EtcdLock) Unlock() error {
// Get the local lock before interacting with etcd.
c.lock.Lock()
defer c.lock.Unlock()
if !c.isHeld {
return EtcdLockNotHeldError
}
// Delete our semaphore key.
if _, err := c.backend.client.Delete(c.semaphoreKey, false); err != nil {
return err
}
// We no longer hold the lock.
c.isHeld = false
// Signal that we no longer hold the lock.
close(c.done)
return nil
}
func (c *EtcdLock) Value() (bool, string, error) {
entry, err := c.backend.Get(c.key)
if err != nil {
return false, "", err
}
var value string
if entry.Value != nil {
value = string(entry.Value)
}
semaphoreKey, _, err := c.getSemaphoreKey()
if err != nil {
return false, "", err
}
if semaphoreKey == "" {
return false, value, nil
}
return true, value, nil
}

View File

@@ -37,4 +37,10 @@ func TestEtcdBackend(t *testing.T) {
testBackend(t, b)
testBackend_ListPrefix(t, b)
ha, ok := b.(HABackend)
if !ok {
t.Fatalf("etcd does not implement HABackend")
}
testHABackend(t, ha, ha)
}