diff --git a/.travis.yml b/.travis.yml index a0659429..7c67278e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,8 @@ python: - "3.3" - "3.4" install: - - pip install -r requirements.txt + - if [[ $TRAVIS_PYTHON_VERSION == 2* ]]; then pip install -r requirements-py2.txt --use-mirrors; fi + - if [[ $TRAVIS_PYTHON_VERSION == 3* ]]; then pip install -r requirements-py3.txt --use-mirrors; fi - pip install coveralls script: - python setup.py test diff --git a/governor.py b/governor.py index d405a9d0..bf824768 100755 --- a/governor.py +++ b/governor.py @@ -77,6 +77,7 @@ class Governor: def main(): logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO) + logging.getLogger('requests').setLevel(logging.WARNING) setup_signal_handlers() if len(sys.argv) < 2 or not os.path.isfile(sys.argv[1]): diff --git a/helpers/errors.py b/helpers/errors.py index b86bd0f0..3a56a1e0 100644 --- a/helpers/errors.py +++ b/helpers/errors.py @@ -9,3 +9,7 @@ class EtcdError(Exception): class CurrentLeaderError(EtcdError): pass + + +class EtcdConnectionFailed(EtcdError): + pass diff --git a/helpers/etcd.py b/helpers/etcd.py index 32345106..72c0995e 100644 --- a/helpers/etcd.py +++ b/helpers/etcd.py @@ -1,11 +1,14 @@ import logging import requests +import socket import sys -from requests.exceptions import RequestException from collections import namedtuple -from helpers.errors import CurrentLeaderError, EtcdError +from dns.exception import DNSException +from dns import resolver +from helpers.errors import CurrentLeaderError, EtcdError, EtcdConnectionFailed from helpers.utils import sleep +from requests.exceptions import RequestException if sys.hexversion >= 0x03000000: from urllib.parse import urlparse, urlunparse, parse_qsl @@ -35,58 +38,186 @@ class Cluster(namedtuple('Cluster', 'initialize,leader,last_leader_operation,mem return not (self.leader and self.leader.hostname) +class Client: + + API_VERSION = 'v2' + + def __init__(self, config): + self._config = config + self.timeout = 5 + self._base_uri = None + self._members_cache = [] + self.load_members() + + def client_url(self, path): + return self._base_uri + path + + def _next_server(self): + self._base_uri = None + try: + self._base_uri = self._members_cache.pop() + except IndexError: + logger.error('Members cache is empty, can not retry.') + raise EtcdConnectionFailed('No more members in the cluster') + else: + logger.info('Selected new etcd server %s', self._base_uri) + + def _get(self, path): + response = None + while response is None: + uri = self.client_url(path) + try: + logger.info('GET %s', uri) + response = requests.get(uri, timeout=self.timeout) + except RequestException: + self._next_server() + + logger.debug([response.status_code, response.content]) + try: + return response.json(), response.status_code + except (TypeError, ValueError): + raise EtcdError('Bad response from %s: %s' % (uri, response.content)) + + @staticmethod + def get_srv_record(host): + try: + return [(r.target.rstrip('.'), r.port) for r in resolver.query('_etcd-server._tcp.' + host, 'SRV')] + except DNSException: + logger.exception('Can not resolve SRV for %s', host) + return [] + + @staticmethod + def get_peers_urls_from_dns(host): + return ['http://{}:{}'.format(h, p) for h, p in Client.get_srv_record(host)] + + @staticmethod + def get_client_urls_from_dns(addr): + host, port = addr.split(':') + ret = [] + try: + for r in set(socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)): + ret.append('http://{}:{}/{}'.format(r[4][0], r[4][1], Client.API_VERSION)) + except socket.error: + logger.exception('Can not resolve %s', host) + return list(set(ret)) if ret else ['http://{}:{}/{}'.format(host, port, Client.API_VERSION)] + + def load_members(self): + load_from_srv = False + if not self._base_uri: + if 'discovery_srv' in self._config: + load_from_srv = True + self._members_cache = self.get_peers_urls_from_dns(self._config['discovery_srv']) + elif 'host' in self._config: + self._members_cache = self.get_client_urls_from_dns(self._config['host']) + else: + raise Exception('Neither discovery_srv nor host are defined in etcd section of config') + self._next_server() + + response, status_code = self._get('/members') + if status_code != 200: + raise EtcdError('Got response with code=%s from %s' % (status_code, self._base_uri)) + + members_cache = [] + for member in response if load_from_srv else response['members']: + members_cache.extend([m + '/' + self.API_VERSION for m in member['clientURLs']]) + self._members_cache = list(set(members_cache)) # TODO: randomize + if load_from_srv: + self._next_server() + else: + try: + self._members_cache.remove(self._base_uri) + except ValueError: + pass + + def get(self, path): + if not self._base_uri: + self.load_members() + old_base_uri = self._base_uri + try: + return self._get(path) + finally: + if self._base_uri != old_base_uri: + try: + self.load_members() + except EtcdError: + logger.exception('load_members') + + def put(self, path, **data): + if not self._base_uri: + self.load_members() + old_base_uri = self._base_uri + response = None + while response is None: + uri = self.client_url(path) + try: + logger.info('PUT %s', uri) + response = requests.put(uri, timeout=self.timeout, data=data) + except RequestException: + logger.exception('PUT %s data=%s', uri, data) + self._next_server() + + if self._base_uri != old_base_uri: + try: + self.load_members() + except EtcdError: + logger.exception('load_members') + return response.status_code in [200, 201, 202, 204] + + def delete(self, path): + if not self._base_uri: + self.load_members() + old_base_uri = self._base_uri + response = None + while response is None: + uri = self.client_url(path) + try: + logger.info('DELETE %s', uri) + response = requests.delete(uri, timeout=self.timeout) + except RequestException: + logger.exception('DELETE %s', uri) + self._next_server() + + if self._base_uri != old_base_uri: + try: + self.load_members() + except EtcdError: + logger.exception('load_members') + return response.status_code in [200, 202, 204] + + class Etcd: def __init__(self, config): self.ttl = config['ttl'] self.member_ttl = config.get('member_ttl', 3600) - self.base_client_url = 'http://{host}/v2/keys/service/{scope}'.format(**config) - self.postgres_cluster = None + self._base_path = '/keys/service/' + config['scope'] + self.client = self.get_etcd_client(config) - def get_client_path(self, path, max_attempts=1): - attempts = 0 - response = None - - while True: - ex = None + def get_etcd_client(self, config): + client = None + while not client: try: - response = requests.get(self.client_url(path)) - if response.status_code == 200: - break - except RequestException as e: - logger.exception('get_client_path') - ex = e + client = Client(config) + except EtcdError: + logger.info('waiting on etcd') + sleep(5) + return client - attempts += 1 - if attempts < max_attempts: - logger.info('Failed to return %s, trying again. (%s of %s)', path, attempts, max_attempts) - sleep(3) - elif ex: - raise ex - else: - break + def client_path(self, path): + return self._base_path + path - return response.json(), response.status_code + def get_client_path(self, path): + return self.client.get(self.client_path(path)) def put_client_path(self, path, **data): - try: - response = requests.put(self.client_url(path), data=data) - return response.status_code in [200, 201, 202, 204] - except RequestException: - logger.exception('PUT %s data=%s', path, data) - raise EtcdError('Etcd is not responding properly') + return self.client.put(self.client_path(path), **data) def delete_client_path(self, path): try: - response = requests.delete(self.client_url(path)) - return response.status_code in [200, 202, 204] - except RequestException: - logger.exception('DELETE %s', path) + return self.client.delete(self.client_path(path)) + except EtcdConnectionFailed: return False - def client_url(self, path): - return self.base_client_url + path - @staticmethod def find_node(node, key): """ diff --git a/helpers/ha.py b/helpers/ha.py index 771e0bfd..9617228f 100644 --- a/helpers/ha.py +++ b/helpers/ha.py @@ -42,7 +42,7 @@ class Ha: self.state_handler.start() if not has_lock: return 'started as a secondary' - logging.info('started as readonly because i had the session lock') + logger.info('started as readonly because i had the session lock') self.load_cluster_from_etcd() if self.cluster.is_unlocked(): diff --git a/postgres0.yml b/postgres0.yml index 51abfbc0..d2a140e4 100644 --- a/postgres0.yml +++ b/postgres0.yml @@ -6,6 +6,7 @@ etcd: scope: batman ttl: 30 host: 127.0.0.1:4001 + #discovery_srv: my-etcd.domain postgresql: name: postgresql0 listen: 127.0.0.1:5432 diff --git a/postgres1.yml b/postgres1.yml index 265ac85b..44fee37f 100644 --- a/postgres1.yml +++ b/postgres1.yml @@ -6,6 +6,7 @@ etcd: scope: batman ttl: 30 host: 127.0.0.1:4001 + #discovery_srv: my-etcd.domain postgresql: name: postgresql1 listen: 127.0.0.1:5433 diff --git a/requirements.txt b/requirements-py2.txt similarity index 75% rename from requirements.txt rename to requirements-py2.txt index d0f5e512..3e77d016 100644 --- a/requirements.txt +++ b/requirements-py2.txt @@ -1,4 +1,5 @@ boto -PyYAML +dnspython psycopg2 +PyYAML requests diff --git a/requirements-py3.txt b/requirements-py3.txt new file mode 100644 index 00000000..68821d9d --- /dev/null +++ b/requirements-py3.txt @@ -0,0 +1,5 @@ +boto +dnspython3 +psycopg2 +PyYAML +requests diff --git a/setup.py b/setup.py index cafa4c23..fa1d4ae5 100644 --- a/setup.py +++ b/setup.py @@ -102,7 +102,8 @@ def setup_package(): # Some helper variables version = os.getenv('GO_PIPELINE_LABEL', VERSION) - install_reqs = get_install_requirements('requirements.txt') + requirements = 'requirements-py2.txt' if sys.version_info[0] == 2 else 'requirements-py3.txt' + install_reqs = get_install_requirements(requirements) command_options = {'test': {'test_suite': ('setup.py', 'tests')}} if JUNIT_XML: diff --git a/tests/test_etcd.py b/tests/test_etcd.py index ec0a19bd..f7476e9f 100644 --- a/tests/test_etcd.py +++ b/tests/test_etcd.py @@ -1,10 +1,13 @@ -import unittest -import requests -import time +import dns.resolver import json +import requests +import socket +import time +import unittest -from helpers.etcd import Cluster, Etcd -from helpers.errors import EtcdError, CurrentLeaderError +from dns.exception import DNSException +from helpers.errors import EtcdError, CurrentLeaderError, EtcdConnectionFailed +from helpers.etcd import Client, Cluster, Etcd class MockResponse: @@ -26,10 +29,19 @@ class MockPostgresql: def requests_get(url, **kwargs): - if url.startswith('http://local'): - raise requests.exceptions.RequestException() + members = '[{"id":14855829450254237642,"peerURLs":["http://localhost:2380","http://localhost:7001"],"name":"default","clientURLs":["http://localhost:2379","http://localhost:4001"]}]' response = MockResponse() - if url.startswith('http://remote') or url.startswith('http://127.0.0.1'): + if url.endswith('/v2/members'): + response.content = '{"members": ' + members + '}' + if url.startswith('http://error'): + response.status_code = 404 + elif url.endswith('/members'): + response.content = members + elif url.endswith('/bad_response'): + response.content = '{' + elif url.startswith('http://local'): + raise requests.exceptions.RequestException() + elif url.startswith('http://remote') or url.startswith('http://127.0.0.1') or url.startswith('http://error'): response.content = '{"action":"get","node":{"key":"/service/batman5","dir":true,"nodes":[{"key":"/service/batman5/initialize","value":"postgresql0","modifiedIndex":1582,"createdIndex":1582},{"key":"/service/batman5/leader","value":"postgresql1","expiration":"2015-05-15T09:11:00.037397538Z","ttl":21,"modifiedIndex":20728,"createdIndex":20434},{"key":"/service/batman5/optime","dir":true,"nodes":[{"key":"/service/batman5/optime/leader","value":"2164261704","modifiedIndex":20729,"createdIndex":20729}],"modifiedIndex":20437,"createdIndex":20437},{"key":"/service/batman5/members","dir":true,"nodes":[{"key":"/service/batman5/members/postgresql1","value":"postgres://replicator:rep-pass@127.0.0.1:5434/postgres?application_name=http://127.0.0.1:8009/governor","expiration":"2015-05-15T09:10:59.949384522Z","ttl":21,"modifiedIndex":20727,"createdIndex":20727},{"key":"/service/batman5/members/postgresql0","value":"postgres://replicator:rep-pass@127.0.0.1:5433/postgres?application_name=http://127.0.0.1:8008/governor","expiration":"2015-05-15T09:11:09.611860899Z","ttl":30,"modifiedIndex":20730,"createdIndex":20730}],"modifiedIndex":1581,"createdIndex":1581}],"modifiedIndex":1581,"createdIndex":1581}}' elif url.startswith('http://other'): response.status_code = 404 @@ -51,7 +63,7 @@ def requests_put(url, **kwargs): return response -def requests_delete(url): +def requests_delete(url, **kwargs): if url.startswith('http://local'): raise requests.exceptions.RequestException() response = MockResponse() @@ -63,6 +75,81 @@ def time_sleep(_): pass +def time_sleep_exception(_): + raise Exception() + + +class MockSRV: + port = 2380 + target = '127.0.0.1' + + +def dns_query(name, type): + if name == '_etcd-server._tcp.blabla': + return [] + elif name == '_etcd-server._tcp.exception': + raise DNSException() + return [MockSRV()] + + +def socket_getaddrinfo(*args): + if args[0] == 'ok': + return [(2, 1, 6, '', ('127.0.0.1', 2379)), (2, 1, 6, '', ('127.0.0.1', 2379))] + raise socket.error() + + +class TestClient(unittest.TestCase): + + def __init__(self, method_name='runTest'): + self.setUp = self.set_up + super(TestClient, self).__init__(method_name) + + def set_up(self): + socket.getaddrinfo = socket_getaddrinfo + requests.get = requests_get + requests.put = requests_put + requests.delete = requests_delete + dns.resolver.query = dns_query + self.client = Client({'discovery_srv': 'test'}) + + def test__get(self): + self.assertRaises(EtcdError, self.client._get, '/bad_response') + + def test_get_srv_record(self): + self.assertEquals(Client.get_srv_record('blabla'), []) + self.assertEquals(Client.get_srv_record('exception'), []) + + def test_get_client_urls_from_dns(self): + self.assertEquals(Client.get_client_urls_from_dns('ok:2379'), ['http://127.0.0.1:2379/v2']) + + def test_load_members(self): + self.client._base_uri = self.client._base_uri.replace('localhost', 'error_code') + self.assertRaises(EtcdError, self.client.load_members) + self.client._base_uri = None + self.client._config = {} + self.assertRaises(Exception, self.client.load_members) + + def test_get(self): + self.client._base_uri = None + self.assertRaises(EtcdConnectionFailed, self.client.get, '') + self.client._members_cache = ['http://error_code:4001/v2'] + self.client.get('') + + def test_put(self): + self.client._base_uri = None + self.assertRaises(EtcdConnectionFailed, self.client.put, '') + self.client._base_uri = 'http://localhost:4001/v2' + self.client._members_cache = ['http://error_code:4001/v2'] + self.client.put('') + + def test_delete(self): + self.client._base_uri = None + self.assertRaises(EtcdConnectionFailed, self.client.delete, '') + self.client._base_uri = 'http://localhost:4001/v2' + self.client._members_cache = ['http://error_code:4001/v2'] + self.client.delete('') + + class TestEtcd(unittest.TestCase): def __init__(self, method_name='runTest'): @@ -70,11 +157,16 @@ class TestEtcd(unittest.TestCase): super(TestEtcd, self).__init__(method_name) def set_up(self): + socket.getaddrinfo = socket_getaddrinfo requests.get = requests_get requests.put = requests_put requests.delete = requests_delete time.sleep = time_sleep - self.etcd = Etcd({'ttl': 30, 'host': 'localhost', 'scope': 'test'}) + self.etcd = Etcd({'ttl': 30, 'host': 'localhost:2379', 'scope': 'test'}) + + def test_get_etcd_client(self): + time.sleep = time_sleep_exception + self.assertRaises(Exception, self.etcd.get_etcd_client, {'host': 'error:2379'}) def test_get_client_path(self): self.assertRaises(Exception, self.etcd.get_client_path, '', 2) @@ -87,12 +179,12 @@ class TestEtcd(unittest.TestCase): def test_get_cluster(self): self.assertRaises(EtcdError, self.etcd.get_cluster) - self.etcd.base_client_url = self.etcd.base_client_url.replace('local', 'remote') + self.etcd.client._base_uri = self.etcd.client._base_uri.replace('local', 'remote') cluster = self.etcd.get_cluster() self.assertIsInstance(cluster, Cluster) - self.etcd.base_client_url = self.etcd.base_client_url.replace('remote', 'other') + self.etcd.client._base_uri = self.etcd.client._base_uri.replace('remote', 'other') self.etcd.get_cluster() - self.etcd.base_client_url = self.etcd.base_client_url.replace('other', 'noleader') + self.etcd.client._base_uri = self.etcd.client._base_uri.replace('other', 'noleader') self.etcd.get_cluster() def test_current_leader(self): @@ -108,9 +200,9 @@ class TestEtcd(unittest.TestCase): self.assertFalse(self.etcd.attempt_to_acquire_leader('')) def test_update_leader(self): - self.etcd.base_client_url = self.etcd.base_client_url.replace('local', 'remote') + url = self.etcd.client._base_uri = self.etcd.client._base_uri.replace('local', 'remote') self.assertTrue(self.etcd.update_leader(MockPostgresql())) - self.etcd.base_client_url = self.etcd.base_client_url.replace('remote', 'other') + self.etcd.client._base_uri = url.replace('remote', 'other') self.assertFalse(self.etcd.update_leader(MockPostgresql())) def test_race(self): diff --git a/tests/test_governor.py b/tests/test_governor.py index aea90c61..fd1b58d7 100644 --- a/tests/test_governor.py +++ b/tests/test_governor.py @@ -69,8 +69,7 @@ class TestGovernor(unittest.TestCase): def test_governor_initialize(self): self.g.postgresql.should_use_s3_to_create_replica = false - self.g.etcd.base_client_url = 'http://remote' - self.g.etcd.client_url + self.g.etcd.client._base_uri = 'http://remote' self.g.postgresql.data_directory_empty = true self.g.etcd.race = true self.g.initialize() diff --git a/tests/test_ha.py b/tests/test_ha.py index 59495b95..574f949f 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -71,7 +71,7 @@ class TestHa(unittest.TestCase): requests.put = requests_put requests.delete = requests_delete self.p = MockPostgresql() - self.e = Etcd({'ttl': 30, 'host': 'remotehost', 'scope': 'test'}) + self.e = Etcd({'ttl': 30, 'host': 'remotehost:2379', 'scope': 'test'}) self.ha = Ha(self.p, self.e) self.ha.load_cluster_from_etcd() self.ha.cluster = Cluster(False, None, None, []) diff --git a/tox.ini b/tox.ini index aa079ec5..bf407bae 100644 --- a/tox.ini +++ b/tox.ini @@ -1,2 +1,8 @@ [flake8] max-line-length=120 + +[testenv:py27] +deps = -rrequirements-py2.txt + +[testenv:py33] +deps = -rrequirements-py3.txt