Merge branch 'etcd-cluster-support' of github.com:CyberDem0n/governor into etcd-cluster-support

Conflicts:
	requirements-py2.txt
	tests/test_governor.py
This commit is contained in:
Alexander Kukushkin
2015-06-09 11:34:14 +02:00
14 changed files with 303 additions and 60 deletions

View File

@@ -4,7 +4,8 @@ python:
- "3.3"
- "3.4"
install:
- pip install -r requirements.txt
- if [[ $TRAVIS_PYTHON_VERSION == 2* ]]; then pip install -r requirements-py2.txt --use-mirrors; fi
- if [[ $TRAVIS_PYTHON_VERSION == 3* ]]; then pip install -r requirements-py3.txt --use-mirrors; fi
- pip install coveralls
script:
- python setup.py test

View File

@@ -77,6 +77,7 @@ class Governor:
def main():
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
logging.getLogger('requests').setLevel(logging.WARNING)
setup_signal_handlers()
if len(sys.argv) < 2 or not os.path.isfile(sys.argv[1]):

View File

@@ -9,3 +9,7 @@ class EtcdError(Exception):
class CurrentLeaderError(EtcdError):
pass
class EtcdConnectionFailed(EtcdError):
pass

View File

@@ -1,11 +1,14 @@
import logging
import requests
import socket
import sys
from requests.exceptions import RequestException
from collections import namedtuple
from helpers.errors import CurrentLeaderError, EtcdError
from dns.exception import DNSException
from dns import resolver
from helpers.errors import CurrentLeaderError, EtcdError, EtcdConnectionFailed
from helpers.utils import sleep
from requests.exceptions import RequestException
if sys.hexversion >= 0x03000000:
from urllib.parse import urlparse, urlunparse, parse_qsl
@@ -35,58 +38,186 @@ class Cluster(namedtuple('Cluster', 'initialize,leader,last_leader_operation,mem
return not (self.leader and self.leader.hostname)
class Client:
API_VERSION = 'v2'
def __init__(self, config):
self._config = config
self.timeout = 5
self._base_uri = None
self._members_cache = []
self.load_members()
def client_url(self, path):
return self._base_uri + path
def _next_server(self):
self._base_uri = None
try:
self._base_uri = self._members_cache.pop()
except IndexError:
logger.error('Members cache is empty, can not retry.')
raise EtcdConnectionFailed('No more members in the cluster')
else:
logger.info('Selected new etcd server %s', self._base_uri)
def _get(self, path):
response = None
while response is None:
uri = self.client_url(path)
try:
logger.info('GET %s', uri)
response = requests.get(uri, timeout=self.timeout)
except RequestException:
self._next_server()
logger.debug([response.status_code, response.content])
try:
return response.json(), response.status_code
except (TypeError, ValueError):
raise EtcdError('Bad response from %s: %s' % (uri, response.content))
@staticmethod
def get_srv_record(host):
try:
return [(r.target.rstrip('.'), r.port) for r in resolver.query('_etcd-server._tcp.' + host, 'SRV')]
except DNSException:
logger.exception('Can not resolve SRV for %s', host)
return []
@staticmethod
def get_peers_urls_from_dns(host):
return ['http://{}:{}'.format(h, p) for h, p in Client.get_srv_record(host)]
@staticmethod
def get_client_urls_from_dns(addr):
host, port = addr.split(':')
ret = []
try:
for r in set(socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)):
ret.append('http://{}:{}/{}'.format(r[4][0], r[4][1], Client.API_VERSION))
except socket.error:
logger.exception('Can not resolve %s', host)
return list(set(ret)) if ret else ['http://{}:{}/{}'.format(host, port, Client.API_VERSION)]
def load_members(self):
load_from_srv = False
if not self._base_uri:
if 'discovery_srv' in self._config:
load_from_srv = True
self._members_cache = self.get_peers_urls_from_dns(self._config['discovery_srv'])
elif 'host' in self._config:
self._members_cache = self.get_client_urls_from_dns(self._config['host'])
else:
raise Exception('Neither discovery_srv nor host are defined in etcd section of config')
self._next_server()
response, status_code = self._get('/members')
if status_code != 200:
raise EtcdError('Got response with code=%s from %s' % (status_code, self._base_uri))
members_cache = []
for member in response if load_from_srv else response['members']:
members_cache.extend([m + '/' + self.API_VERSION for m in member['clientURLs']])
self._members_cache = list(set(members_cache)) # TODO: randomize
if load_from_srv:
self._next_server()
else:
try:
self._members_cache.remove(self._base_uri)
except ValueError:
pass
def get(self, path):
if not self._base_uri:
self.load_members()
old_base_uri = self._base_uri
try:
return self._get(path)
finally:
if self._base_uri != old_base_uri:
try:
self.load_members()
except EtcdError:
logger.exception('load_members')
def put(self, path, **data):
if not self._base_uri:
self.load_members()
old_base_uri = self._base_uri
response = None
while response is None:
uri = self.client_url(path)
try:
logger.info('PUT %s', uri)
response = requests.put(uri, timeout=self.timeout, data=data)
except RequestException:
logger.exception('PUT %s data=%s', uri, data)
self._next_server()
if self._base_uri != old_base_uri:
try:
self.load_members()
except EtcdError:
logger.exception('load_members')
return response.status_code in [200, 201, 202, 204]
def delete(self, path):
if not self._base_uri:
self.load_members()
old_base_uri = self._base_uri
response = None
while response is None:
uri = self.client_url(path)
try:
logger.info('DELETE %s', uri)
response = requests.delete(uri, timeout=self.timeout)
except RequestException:
logger.exception('DELETE %s', uri)
self._next_server()
if self._base_uri != old_base_uri:
try:
self.load_members()
except EtcdError:
logger.exception('load_members')
return response.status_code in [200, 202, 204]
class Etcd:
def __init__(self, config):
self.ttl = config['ttl']
self.member_ttl = config.get('member_ttl', 3600)
self.base_client_url = 'http://{host}/v2/keys/service/{scope}'.format(**config)
self.postgres_cluster = None
self._base_path = '/keys/service/' + config['scope']
self.client = self.get_etcd_client(config)
def get_client_path(self, path, max_attempts=1):
attempts = 0
response = None
while True:
ex = None
def get_etcd_client(self, config):
client = None
while not client:
try:
response = requests.get(self.client_url(path))
if response.status_code == 200:
break
except RequestException as e:
logger.exception('get_client_path')
ex = e
client = Client(config)
except EtcdError:
logger.info('waiting on etcd')
sleep(5)
return client
attempts += 1
if attempts < max_attempts:
logger.info('Failed to return %s, trying again. (%s of %s)', path, attempts, max_attempts)
sleep(3)
elif ex:
raise ex
else:
break
def client_path(self, path):
return self._base_path + path
return response.json(), response.status_code
def get_client_path(self, path):
return self.client.get(self.client_path(path))
def put_client_path(self, path, **data):
try:
response = requests.put(self.client_url(path), data=data)
return response.status_code in [200, 201, 202, 204]
except RequestException:
logger.exception('PUT %s data=%s', path, data)
raise EtcdError('Etcd is not responding properly')
return self.client.put(self.client_path(path), **data)
def delete_client_path(self, path):
try:
response = requests.delete(self.client_url(path))
return response.status_code in [200, 202, 204]
except RequestException:
logger.exception('DELETE %s', path)
return self.client.delete(self.client_path(path))
except EtcdConnectionFailed:
return False
def client_url(self, path):
return self.base_client_url + path
@staticmethod
def find_node(node, key):
"""

View File

@@ -42,7 +42,7 @@ class Ha:
self.state_handler.start()
if not has_lock:
return 'started as a secondary'
logging.info('started as readonly because i had the session lock')
logger.info('started as readonly because i had the session lock')
self.load_cluster_from_etcd()
if self.cluster.is_unlocked():

View File

@@ -6,6 +6,7 @@ etcd:
scope: batman
ttl: 30
host: 127.0.0.1:4001
#discovery_srv: my-etcd.domain
postgresql:
name: postgresql0
listen: 127.0.0.1:5432

View File

@@ -6,6 +6,7 @@ etcd:
scope: batman
ttl: 30
host: 127.0.0.1:4001
#discovery_srv: my-etcd.domain
postgresql:
name: postgresql1
listen: 127.0.0.1:5433

View File

@@ -1,4 +1,5 @@
boto
PyYAML
dnspython
psycopg2
PyYAML
requests

5
requirements-py3.txt Normal file
View File

@@ -0,0 +1,5 @@
boto
dnspython3
psycopg2
PyYAML
requests

View File

@@ -102,7 +102,8 @@ def setup_package():
# Some helper variables
version = os.getenv('GO_PIPELINE_LABEL', VERSION)
install_reqs = get_install_requirements('requirements.txt')
requirements = 'requirements-py2.txt' if sys.version_info[0] == 2 else 'requirements-py3.txt'
install_reqs = get_install_requirements(requirements)
command_options = {'test': {'test_suite': ('setup.py', 'tests')}}
if JUNIT_XML:

View File

@@ -1,10 +1,13 @@
import unittest
import requests
import time
import dns.resolver
import json
import requests
import socket
import time
import unittest
from helpers.etcd import Cluster, Etcd
from helpers.errors import EtcdError, CurrentLeaderError
from dns.exception import DNSException
from helpers.errors import EtcdError, CurrentLeaderError, EtcdConnectionFailed
from helpers.etcd import Client, Cluster, Etcd
class MockResponse:
@@ -26,10 +29,19 @@ class MockPostgresql:
def requests_get(url, **kwargs):
if url.startswith('http://local'):
raise requests.exceptions.RequestException()
members = '[{"id":14855829450254237642,"peerURLs":["http://localhost:2380","http://localhost:7001"],"name":"default","clientURLs":["http://localhost:2379","http://localhost:4001"]}]'
response = MockResponse()
if url.startswith('http://remote') or url.startswith('http://127.0.0.1'):
if url.endswith('/v2/members'):
response.content = '{"members": ' + members + '}'
if url.startswith('http://error'):
response.status_code = 404
elif url.endswith('/members'):
response.content = members
elif url.endswith('/bad_response'):
response.content = '{'
elif url.startswith('http://local'):
raise requests.exceptions.RequestException()
elif url.startswith('http://remote') or url.startswith('http://127.0.0.1') or url.startswith('http://error'):
response.content = '{"action":"get","node":{"key":"/service/batman5","dir":true,"nodes":[{"key":"/service/batman5/initialize","value":"postgresql0","modifiedIndex":1582,"createdIndex":1582},{"key":"/service/batman5/leader","value":"postgresql1","expiration":"2015-05-15T09:11:00.037397538Z","ttl":21,"modifiedIndex":20728,"createdIndex":20434},{"key":"/service/batman5/optime","dir":true,"nodes":[{"key":"/service/batman5/optime/leader","value":"2164261704","modifiedIndex":20729,"createdIndex":20729}],"modifiedIndex":20437,"createdIndex":20437},{"key":"/service/batman5/members","dir":true,"nodes":[{"key":"/service/batman5/members/postgresql1","value":"postgres://replicator:rep-pass@127.0.0.1:5434/postgres?application_name=http://127.0.0.1:8009/governor","expiration":"2015-05-15T09:10:59.949384522Z","ttl":21,"modifiedIndex":20727,"createdIndex":20727},{"key":"/service/batman5/members/postgresql0","value":"postgres://replicator:rep-pass@127.0.0.1:5433/postgres?application_name=http://127.0.0.1:8008/governor","expiration":"2015-05-15T09:11:09.611860899Z","ttl":30,"modifiedIndex":20730,"createdIndex":20730}],"modifiedIndex":1581,"createdIndex":1581}],"modifiedIndex":1581,"createdIndex":1581}}'
elif url.startswith('http://other'):
response.status_code = 404
@@ -51,7 +63,7 @@ def requests_put(url, **kwargs):
return response
def requests_delete(url):
def requests_delete(url, **kwargs):
if url.startswith('http://local'):
raise requests.exceptions.RequestException()
response = MockResponse()
@@ -63,6 +75,81 @@ def time_sleep(_):
pass
def time_sleep_exception(_):
raise Exception()
class MockSRV:
port = 2380
target = '127.0.0.1'
def dns_query(name, type):
if name == '_etcd-server._tcp.blabla':
return []
elif name == '_etcd-server._tcp.exception':
raise DNSException()
return [MockSRV()]
def socket_getaddrinfo(*args):
if args[0] == 'ok':
return [(2, 1, 6, '', ('127.0.0.1', 2379)), (2, 1, 6, '', ('127.0.0.1', 2379))]
raise socket.error()
class TestClient(unittest.TestCase):
def __init__(self, method_name='runTest'):
self.setUp = self.set_up
super(TestClient, self).__init__(method_name)
def set_up(self):
socket.getaddrinfo = socket_getaddrinfo
requests.get = requests_get
requests.put = requests_put
requests.delete = requests_delete
dns.resolver.query = dns_query
self.client = Client({'discovery_srv': 'test'})
def test__get(self):
self.assertRaises(EtcdError, self.client._get, '/bad_response')
def test_get_srv_record(self):
self.assertEquals(Client.get_srv_record('blabla'), [])
self.assertEquals(Client.get_srv_record('exception'), [])
def test_get_client_urls_from_dns(self):
self.assertEquals(Client.get_client_urls_from_dns('ok:2379'), ['http://127.0.0.1:2379/v2'])
def test_load_members(self):
self.client._base_uri = self.client._base_uri.replace('localhost', 'error_code')
self.assertRaises(EtcdError, self.client.load_members)
self.client._base_uri = None
self.client._config = {}
self.assertRaises(Exception, self.client.load_members)
def test_get(self):
self.client._base_uri = None
self.assertRaises(EtcdConnectionFailed, self.client.get, '')
self.client._members_cache = ['http://error_code:4001/v2']
self.client.get('')
def test_put(self):
self.client._base_uri = None
self.assertRaises(EtcdConnectionFailed, self.client.put, '')
self.client._base_uri = 'http://localhost:4001/v2'
self.client._members_cache = ['http://error_code:4001/v2']
self.client.put('')
def test_delete(self):
self.client._base_uri = None
self.assertRaises(EtcdConnectionFailed, self.client.delete, '')
self.client._base_uri = 'http://localhost:4001/v2'
self.client._members_cache = ['http://error_code:4001/v2']
self.client.delete('')
class TestEtcd(unittest.TestCase):
def __init__(self, method_name='runTest'):
@@ -70,11 +157,16 @@ class TestEtcd(unittest.TestCase):
super(TestEtcd, self).__init__(method_name)
def set_up(self):
socket.getaddrinfo = socket_getaddrinfo
requests.get = requests_get
requests.put = requests_put
requests.delete = requests_delete
time.sleep = time_sleep
self.etcd = Etcd({'ttl': 30, 'host': 'localhost', 'scope': 'test'})
self.etcd = Etcd({'ttl': 30, 'host': 'localhost:2379', 'scope': 'test'})
def test_get_etcd_client(self):
time.sleep = time_sleep_exception
self.assertRaises(Exception, self.etcd.get_etcd_client, {'host': 'error:2379'})
def test_get_client_path(self):
self.assertRaises(Exception, self.etcd.get_client_path, '', 2)
@@ -87,12 +179,12 @@ class TestEtcd(unittest.TestCase):
def test_get_cluster(self):
self.assertRaises(EtcdError, self.etcd.get_cluster)
self.etcd.base_client_url = self.etcd.base_client_url.replace('local', 'remote')
self.etcd.client._base_uri = self.etcd.client._base_uri.replace('local', 'remote')
cluster = self.etcd.get_cluster()
self.assertIsInstance(cluster, Cluster)
self.etcd.base_client_url = self.etcd.base_client_url.replace('remote', 'other')
self.etcd.client._base_uri = self.etcd.client._base_uri.replace('remote', 'other')
self.etcd.get_cluster()
self.etcd.base_client_url = self.etcd.base_client_url.replace('other', 'noleader')
self.etcd.client._base_uri = self.etcd.client._base_uri.replace('other', 'noleader')
self.etcd.get_cluster()
def test_current_leader(self):
@@ -108,9 +200,9 @@ class TestEtcd(unittest.TestCase):
self.assertFalse(self.etcd.attempt_to_acquire_leader(''))
def test_update_leader(self):
self.etcd.base_client_url = self.etcd.base_client_url.replace('local', 'remote')
url = self.etcd.client._base_uri = self.etcd.client._base_uri.replace('local', 'remote')
self.assertTrue(self.etcd.update_leader(MockPostgresql()))
self.etcd.base_client_url = self.etcd.base_client_url.replace('remote', 'other')
self.etcd.client._base_uri = url.replace('remote', 'other')
self.assertFalse(self.etcd.update_leader(MockPostgresql()))
def test_race(self):

View File

@@ -69,8 +69,7 @@ class TestGovernor(unittest.TestCase):
def test_governor_initialize(self):
self.g.postgresql.should_use_s3_to_create_replica = false
self.g.etcd.base_client_url = 'http://remote'
self.g.etcd.client_url
self.g.etcd.client._base_uri = 'http://remote'
self.g.postgresql.data_directory_empty = true
self.g.etcd.race = true
self.g.initialize()

View File

@@ -71,7 +71,7 @@ class TestHa(unittest.TestCase):
requests.put = requests_put
requests.delete = requests_delete
self.p = MockPostgresql()
self.e = Etcd({'ttl': 30, 'host': 'remotehost', 'scope': 'test'})
self.e = Etcd({'ttl': 30, 'host': 'remotehost:2379', 'scope': 'test'})
self.ha = Ha(self.p, self.e)
self.ha.load_cluster_from_etcd()
self.ha.cluster = Cluster(False, None, None, [])

View File

@@ -1,2 +1,8 @@
[flake8]
max-line-length=120
[testenv:py27]
deps = -rrequirements-py2.txt
[testenv:py33]
deps = -rrequirements-py3.txt