From 24a2ea6cefd9dec31b4081472dd087dcbb710c90 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Sun, 10 Apr 2016 10:37:43 +0200 Subject: [PATCH] Refactor acceptance tests to make them work against ZooKeeper and make it easier to implement controllers for new DCS, i.e. consul --- .travis.yml | 35 +- features/basic_replication.feature | 2 +- features/environment.py | 502 +++++++++++++++------------- features/patroni_api.feature | 2 +- features/steps/basic_replication.py | 5 +- features/steps/patroni_api.py | 5 +- patroni/__init__.py | 4 +- patroni/utils.py | 8 +- patroni/zookeeper.py | 11 +- postgres0.yml | 1 + postgres1.yml | 1 + postgres2.yml | 1 + setup.py | 9 +- tests/test_patroni.py | 10 +- tests/test_utils.py | 4 +- 15 files changed, 328 insertions(+), 272 deletions(-) diff --git a/.travis.yml b/.travis.yml index 73d49104..1fcd871e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ addons: postgresql: "9.5" env: global: - - BOTO_CONFIG='' ETCDVERSION=2.2.5 + - BOTO_CONFIG='' ETCDVERSION=2.2.5 ZKVERSION=3.4.6 matrix: - TEST_SUITE="python setup.py test" - TEST_SUITE="behave" @@ -13,20 +13,31 @@ python: - "3.4" - "3.5" install: - - sudo /etc/init.d/postgresql stop - - sudo apt-get -y remove --purge postgresql-9.1 postgresql-9.2 postgresql-9.3 postgresql-9.4 - - sudo apt-get -y autoremove - - sudo apt-key adv --keyserver keys.gnupg.net --recv-keys 7FCC7D46ACCC4CF8 - - sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt/ precise-pgdg main 9.5" >> /etc/apt/sources.list.d/postgresql.list' - - sudo apt-get update - - sudo apt-get -y install postgresql-9.5 - - sudo /etc/init.d/postgresql stop + - | + if [[ $TEST_SUITE == "behave" ]]; then + sudo bash -c ' + /etc/init.d/postgresql stop + apt-get -y remove --purge postgresql-9.1 postgresql-9.2 postgresql-9.3 postgresql-9.4 + apt-get -y autoremove + apt-key adv --keyserver keys.gnupg.net --recv-keys 7FCC7D46ACCC4CF8 + echo "deb http://apt.postgresql.org/pub/repos/apt/ precise-pgdg main 9.5" >> /etc/apt/sources.list.d/postgresql.list + apt-get update + apt-get -y install postgresql-9.5 + /etc/init.d/postgresql stop' + curl -L https://github.com/coreos/etcd/releases/download/v${ETCDVERSION}/etcd-v${ETCDVERSION}-linux-amd64.tar.gz | tar xz -C . --strip=1 --wildcards --no-anchored etcd + curl -L http://www.apache.org/dist/zookeeper/zookeeper-${ZKVERSION}/zookeeper-${ZKVERSION}.tar.gz | tar xz + mv zookeeper-${ZKVERSION}/conf/zoo_sample.cfg zookeeper-${ZKVERSION}/conf/zoo.cfg + zookeeper-${ZKVERSION}/bin/zkServer.sh start + while true; do + echo -e 'HTTP/1.0 200 OK\nContent-Type: application/json\n\n{"servers":["127.0.0.1"],"port":2181}' \ + | nc -l 8181 &> /dev/null + done& + fi - pip install -r requirements.txt - - curl -L https://github.com/coreos/etcd/releases/download/v${ETCDVERSION}/etcd-v${ETCDVERSION}-linux-amd64.tar.gz | tar xz -C . --strip=1 --wildcards --no-anchored etcd - - pip install behave codacy-coverage coverage coveralls + - pip install behave codacy-coverage coverage coveralls script: - PATH=.:$PATH $TEST_SUITE - - python setup.py flake8 + - if [[ $TEST_SUITE == "behave" ]]; then PATH=.:$PATH DCS=exhibitor $TEST_SUITE; else python setup.py flake8; fi after_success: - coveralls - if [[ -f coverage.xml ]]; then python-codacy-coverage -r coverage.xml; fi diff --git a/features/basic_replication.feature b/features/basic_replication.feature index b3307e40..553440c4 100644 --- a/features/basic_replication.feature +++ b/features/basic_replication.feature @@ -14,4 +14,4 @@ Feature: basic replication When I start postgres0 Then postgres0 role is the secondary after 15 seconds When I add the table bar to postgres1 - Then table bar is present on postgres0 after 10 seconds + Then table bar is present on postgres0 after 15 seconds diff --git a/features/environment.py b/features/environment.py index 73f77011..52405eb6 100644 --- a/features/environment.py +++ b/features/environment.py @@ -1,294 +1,340 @@ +import abc +import etcd +import kazoo.client +import kazoo.exceptions import os import psycopg2 -import requests import shutil +import six import subprocess import tempfile import time import yaml -class PatroniController(object): - PATRONI_CONFIG = '{}.yml' - """ starts and stops individual patronis""" +@six.add_metaclass(abc.ABCMeta) +class AbstractController(object): - def __init__(self): - self._output_dir = None - self._patroni_path = None - self._connections = {} - self._config = {} - self._connstring = {} - self._cursors = {} - self._log = {} - self._processes = {} + def __init__(self, name, work_directory, output_dir): + self._name = name + self._work_directory = work_directory + self._output_dir = output_dir + self._handle = None + self._log = None - @property - def patroni_path(self): - if self._patroni_path is None: - cwd = os.path.realpath(__file__) - while True: - path, entry = os.path.split(cwd) - cwd = path - if entry == 'features' or cwd == '/': - break - self._patroni_path = cwd - return self._patroni_path + def _has_started(self): + return self._handle and self._handle.pid and self._handle.poll() is None - def data_dir(self, pg_name): - return os.path.join(self.patroni_path, 'data', pg_name) + def _is_running(self): + return self._has_started() - def write_label(self, pg_name, content): - with open(os.path.join(self.data_dir(pg_name), 'label'), 'w') as f: - f.write(content) + @abc.abstractmethod + def _is_accessible(self): + """process is accessible for queries""" - def read_label(self, pg_name): - content = None - try: - with open(os.path.join(self.data_dir(pg_name), 'label'), 'r') as f: - content = f.read() - except IOError: - return None - return content.strip() + @abc.abstractmethod + def _start(self): + """start process""" - def start(self, pg_name, max_wait_limit=20, tags=None): - if not self._is_running(pg_name): - if pg_name in self._processes: - del self._processes[pg_name] - cwd = self.patroni_path - self._log[pg_name] = open(os.path.join(self._output_dir, 'patroni_{0}.log'.format(pg_name)), 'a') + def start(self, max_wait_limit=5): + if self._is_running(): + return True - self._config[pg_name] = self._make_patroni_test_config(pg_name, tags=tags) + self._log = open(os.path.join(self._output_dir, self._name + '.log'), 'a') + self._handle = self._start() + + assert self._has_started(), "Process {0} is not running after being started".format(self._name) - p = subprocess.Popen(['coverage', 'run', '--branch', '--source=patroni', '-p', 'patroni.py', self._config[pg_name]], - stdout=self._log[pg_name], stderr=subprocess.STDOUT, cwd=cwd) - if not (p and p.pid and p.poll() is None): - assert False, "PostgreSQL {0} is not running after being started".format(pg_name) - self._processes[pg_name] = p - # wait while patroni is available for queries, but not more than 10 seconds. for _ in range(max_wait_limit): - if self.query(pg_name, "SELECT 1", fail_ok=True) is not None: + if self._is_accessible(): break time.sleep(1) else: assert False,\ - "Patroni instance is not available for queries after {0} seconds".format(max_wait_limit) + "{0} instance is not available for queries after {1} seconds".format(self._name, max_wait_limit) - def stop(self, pg_name, kill=False, timeout=15): + def stop(self, kill=False, timeout=15): + term = False start_time = time.time() - while self._is_running(pg_name): - if not kill: - self._processes[pg_name].terminate() - else: - self._processes[pg_name].kill() + + while self._handle and self._is_running(): + if kill: + self._handle.kill() + elif not term: + self._handle.terminate() + term = True time.sleep(1) if not kill and time.time() - start_time > timeout: kill = True - if self._log.get('pg_name') and not self._log['pg_name'].closed: - self._log[pg_name].close() - if pg_name in self._processes: - del self._processes[pg_name] - def query(self, pg_name, query, fail_ok=False): + if self._log: + self._log.close() + + +class PatroniController(AbstractController): + PATRONI_CONFIG = '{}.yml' + """ starts and stops individual patronis""" + + def __init__(self, dcs, name, work_directory, output_dir, tags=None): + super(PatroniController, self).__init__('patroni_' + name, work_directory, output_dir) + self._data_dir = os.path.join(work_directory, 'data', name) + self._connstring = None + self._config = self._make_patroni_test_config(name, dcs, tags) + + self._conn = None + self._curs = None + + def write_label(self, content): + with open(os.path.join(self._data_dir, 'label'), 'w') as f: + f.write(content) + + def read_label(self): try: - cursor = self._cursor(pg_name) - cursor.execute(query) - return cursor - except psycopg2.Error: - if fail_ok: - return None - else: - raise + with open(os.path.join(self._data_dir, 'label'), 'r') as f: + return f.read().strip() + except IOError: + return None - def check_role_has_changed_to(self, pg_name, new_role, timeout=10): - bound_time = time.time() + timeout - recovery_status = False if new_role == 'primary' else True - role_has_changed = False - while not role_has_changed: - cur = self.query(pg_name, "SELECT pg_is_in_recovery()", fail_ok=True) - if cur: - row = cur.fetchone() - if row and len(row) > 0 and row[0] == recovery_status: - role_has_changed = True - if time.time() > bound_time: - break - time.sleep(1) - return role_has_changed + def _start(self): + return subprocess.Popen(['coverage', 'run', '--source=patroni', '-p', 'patroni.py', self._config], + stdout=self._log, stderr=subprocess.STDOUT, cwd=self._work_directory) - def stop_all(self): - for patroni in self._processes.copy(): - self.stop(patroni) + def _is_accessible(self): + return self.query("SELECT 1", fail_ok=True) is not None - def create_and_set_output_directory(self, feature_name): - feature_dir = os.path.join(self.patroni_path, "features", "output", - feature_name.replace(' ', '_')) - if os.path.exists(feature_dir): - shutil.rmtree(feature_dir) - os.makedirs(feature_dir) - self._output_dir = feature_dir - - def _is_running(self, pg_name): - return pg_name in self._processes and self._processes[pg_name].pid and (self._processes[pg_name].poll() is None) - - def _make_patroni_test_config(self, pg_name, tags=None): - patroni_config_name = PatroniController.PATRONI_CONFIG.format(pg_name) + def _make_patroni_test_config(self, name, dcs, tags): + patroni_config_name = self.PATRONI_CONFIG.format(name) patroni_config_path = os.path.join(self._output_dir, patroni_config_name) with open(patroni_config_name) as f: config = yaml.load(f) - postgresql = config['postgresql'] - postgresql['name'] = pg_name - postgresql['data_dir'] = 'data/{0}'.format(pg_name) - postgresql_params = postgresql['parameters'] - postgresql_params['logging_collector'] = 'on' - postgresql_params['log_destination'] = 'csvlog' - postgresql_params['log_directory'] = self._output_dir - postgresql_params['log_filename'] = '{0}.log'.format(pg_name) - postgresql_params['log_statement'] = 'all' - postgresql_params['log_min_messages'] = 'debug1' - postgresql_params['unix_socket_directories'] = '.' + + self._connstring = self._make_connstring(config) + + config['postgresql'].update({'name': name, 'data_dir': self._data_dir}) + config['postgresql']['parameters'].update({ + 'logging_collector': 'on', 'log_destination': 'csvlog', 'log_directory': self._output_dir, + 'log_filename': name + '.log', 'log_statement': 'all', 'log_min_messages': 'debug1'}) if tags: config['tags'] = tags + if dcs != 'etcd': + etcd = config.pop('etcd') + config['zookeeper'] = {'scope': etcd['scope'], 'session_timeout': etcd['ttl'], + 'reconnect_timeout': config['loop_wait']} + if dcs == 'exhibitor': + config['zookeeper']['exhibitor'] = {'hosts': ['127.0.0.1'], 'port': 8181} + else: + config['zookeeper']['hosts'] = ['127.0.0.1:2181'] + with open(patroni_config_path, 'w') as f: yaml.dump(config, f, default_flow_style=False) return patroni_config_path - def _make_connstring(self, pg_name): - if pg_name in self._connstring: - return self._connstring[pg_name] - try: - patroni_path = self.patroni_path - with open(os.path.join(patroni_path, PatroniController.PATRONI_CONFIG.format(pg_name)), 'r') as f: - config = yaml.load(f) - except IOError: - return None - connstring = config['postgresql']['connect_address'] - if ':' in connstring: - address, port = connstring.split(':') - else: - address = connstring - port = '5432' - user = "postgres" - dbname = "postgres" - self._connstring[pg_name] = "host={0} port={1} dbname={2} user={3}".format(address, port, dbname, user) - return self._connstring[pg_name] - - def _connection(self, pg_name): - if pg_name not in self._connections or self._connections[pg_name].closed: - conn = psycopg2.connect(self._make_connstring(pg_name)) - conn.autocommit = True - self._connections[pg_name] = conn - return self._connections[pg_name] - - def _cursor(self, pg_name): - if pg_name not in self._cursors or self._cursors[pg_name].closed: - cursor = self._connection(pg_name).cursor() - self._cursors[pg_name] = cursor - return self._cursors[pg_name] - - -class EtcdController(object): - - """ handles all etcd related tasks, used for the tests setup and cleanup """ - ETCD_VERSION_URL = 'http://127.0.0.1:2379/version' - ETCD_CLEANUP_URL = 'http://127.0.0.1:2379/v2/keys/service/batman?recursive=true' - - def __init__(self, log_directory): - self.handle = None - self.work_directory = None - self.log_directory = log_directory - self.log_file = None - self.pid = None - self.start_timeout = 5 - - def start(self): - """ start etcd if it's not already running """ - if self._is_running(): - return True - self.work_directory = tempfile.mkdtemp() - # etcd is running throughout the tests, no need to append to the log - output_dir = os.path.join(self.log_directory, "features", "output") - if not os.path.exists(output_dir): - os.makedirs(output_dir) - self.log_file = open(os.path.join(output_dir, 'etcd.log'), 'w') - self.handle =\ - subprocess.Popen(["etcd", "--debug", "--data-dir", self.work_directory], - stdout=self.log_file, stderr=subprocess.STDOUT) - start_time = time.time() - while (not self._is_running()): - if time.time() - start_time > self.start_timeout: - assert False, "Failed to start etcd" - time.sleep(1) - return True - @staticmethod - def query(key): - """ query etcd for a value of a given key """ - r = requests.get("http://127.0.0.1:2379/v2/keys/service/batman/{0}".format(key)) - if r.ok: - content = r.json() - if content: - return content.get('node', {}).get('value') - return None + def _make_connstring(config): + tmp = (config['postgresql']['connect_address'] + ':5432').split(':') + return 'host={0} port={1} dbname=postgres user=postgres'.format(*tmp[:2]) + + def _connection(self): + if not self._conn or self._conn.closed != 0: + self._conn = psycopg2.connect(self._connstring) + self._conn.autocommit = True + return self._conn + + def _cursor(self): + if not self._curs or self._curs.closed or self._curs.connection.closed != 0: + self._curs = self._connection().cursor() + return self._curs + + def query(self, query, fail_ok=False): + try: + cursor = self._cursor() + cursor.execute(query) + return cursor + except psycopg2.Error: + if not fail_ok: + raise + + def check_role_has_changed_to(self, new_role, timeout=10): + bound_time = time.time() + timeout + recovery_status = new_role != 'primary' + while time.time() < bound_time: + cur = self.query("SELECT pg_is_in_recovery()", fail_ok=True) + if cur: + row = cur.fetchone() + if row and row[0] == recovery_status: + return True + time.sleep(1) + return False + + +class AbstractDcsController(AbstractController): + + _CLUSTER_NODE = 'service/batman' + + def _is_accessible(self): + return self._is_running() def stop_and_remove_work_directory(self, timeout=15): - """ terminate etcd and wipe out the temp work directory, but only if we actually started it""" - kill = False - start_time = time.time() - while self._is_running() and self.handle: - if not kill: - self.handle.terminate() - else: - self.handle.kill() - time.sleep(1) - if not kill and time.time() - start_time > timeout: - kill = True - self.handle = None - if self.log_file and not self.log_file.closed: - self.log_file.close() - if self.work_directory: - shutil.rmtree(self.work_directory) - self.work_directory = None + """ terminate process and wipe out the temp work directory, but only if we actually started it""" + self.stop(timeout=timeout) + if self._work_directory: + shutil.rmtree(self._work_directory) - @staticmethod - def cleanup_service_tree(): + @abc.abstractmethod + def query(self, key): + """ query for a value of a given key """ + + @abc.abstractmethod + def cleanup_service_tree(self): """ clean all contents stored in the tree used for the tests """ - r = None + + +class EtcdController(AbstractDcsController): + + """ handles all etcd related tasks, used for the tests setup and cleanup """ + + def __init__(self, output_dir): + super(EtcdController, self).__init__('etcd', tempfile.mkdtemp(), output_dir) + self._client = etcd.Client() + + def _start(self): + return subprocess.Popen(["etcd", "--debug", "--data-dir", self._work_directory], + stdout=self._log, stderr=subprocess.STDOUT) + + def query(self, key): try: - r = requests.delete(EtcdController.ETCD_CLEANUP_URL) - if r and not r.ok: - assert False,\ - "request to cleanup the etcd contents was not successfull: status code {0}".format(r.status_code) - except requests.exceptions.RequestException as e: + return self._client.get('/{0}/{1}'.format(self._CLUSTER_NODE, key)).value + except etcd.EtcdKeyNotFound: + return None + + def cleanup_service_tree(self): + try: + self._client.delete('/' + self._CLUSTER_NODE, recursive=True) + except (etcd.EtcdKeyNotFound, etcd.EtcdConnectionFailed): + return + except Exception as e: assert False, "exception when cleaning up etcd contents: {0}".format(e) - @staticmethod - def _is_running(): + def _is_running(self): # if etcd is running, but we didn't start it try: - r = requests.get(EtcdController.ETCD_VERSION_URL) - running = (r and r.ok and b'etcdserver' in r.content) - except requests.ConnectionError: - running = False - return running + return bool(self._client.machines) + except Exception: + return False + + +class ZooKeeperController(AbstractDcsController): + + """ handles all zookeeper related tasks, used for the tests setup and cleanup """ + + def __init__(self, output_dir): + super(ZooKeeperController, self).__init__('zookeeper', None, output_dir) + self._client = kazoo.client.KazooClient() + + def _start(self): + pass # TODO: implement later + + def query(self, key): + try: + return self._client.get('/{0}/{1}'.format(self._CLUSTER_NODE, key))[0].decode('utf-8') + except kazoo.exceptions.NoNodeError: + return None + + def cleanup_service_tree(self): + try: + self._client.delete('/' + self._CLUSTER_NODE, recursive=True) + except (kazoo.exceptions.NoNodeError): + return + except Exception as e: + assert False, "exception when cleaning up zookeeper contents: {0}".format(e) + + def _is_running(self): + # if zookeeper is running, but we didn't start it + if self._client.connected: + return True + try: + return self._client.start(1) or True + except Exception: + return False + + +class PatroniPoolController(object): + + KNOWN_DCS = {'etcd': EtcdController, 'zookeeper': ZooKeeperController, 'exhibitor': ZooKeeperController} + + def __init__(self): + self._dcs = None + self._output_dir = None + self._patroni_path = None + self._processes = {} + self.create_and_set_output_directory('') + + @property + def patroni_path(self): + if self._patroni_path is None: + cwd = os.path.realpath(__file__) + while True: + cwd, entry = os.path.split(cwd) + if entry == 'features' or cwd == '/': + break + self._patroni_path = cwd + return self._patroni_path + + @property + def output_dir(self): + return self._output_dir + + def start(self, pg_name, max_wait_limit=20, tags=None): + if pg_name not in self._processes: + self._processes[pg_name] = PatroniController(self.dcs, pg_name, self.patroni_path, self._output_dir, tags) + self._processes[pg_name].start(max_wait_limit) + + def __getattr__(self, func): + if func not in ['stop', 'query', 'write_label', 'read_label', 'check_role_has_changed_to']: + raise AttributeError("PatroniPoolController instance has no attribute '{0}'".format(func)) + + def wrapper(pg_name, *args, **kwargs): + return getattr(self._processes[pg_name], func)(*args, **kwargs) + return wrapper + + def stop_all(self): + for ctl in self._processes.values(): + ctl.stop() + self._processes.clear() + + def create_and_set_output_directory(self, feature_name): + feature_dir = os.path.join(self.patroni_path, 'features/output', feature_name.replace(' ', '_')) + if os.path.exists(feature_dir): + shutil.rmtree(feature_dir) + os.makedirs(feature_dir) + self._output_dir = feature_dir + + @property + def dcs(self): + if self._dcs is None: + self._dcs = os.environ.get('DCS', 'etcd') + assert self._dcs in self.KNOWN_DCS, 'Unsupported dcs: ' + self.dcs + return self._dcs # actions to execute on start/stop of the tests and before running invidual features def before_all(context): - context.pctl = PatroniController() - context.etcd_ctl = EtcdController(context.pctl.patroni_path) - context.etcd_ctl.start() + context.pctl = PatroniPoolController() + context.dcs_ctl = context.pctl.KNOWN_DCS[context.pctl.dcs](context.pctl.output_dir) + context.dcs_ctl.start() try: - context.etcd_ctl.cleanup_service_tree() - except AssertionError: # after.all handlers won't be executed in before.all - context.etcd_ctl.stop_and_remove_work_directory() + context.dcs_ctl.cleanup_service_tree() + except AssertionError: # after_all handlers won't be executed in before_all + context.dcs_ctl.stop_and_remove_work_directory() raise def after_all(context): - context.etcd_ctl.stop_and_remove_work_directory() + context.dcs_ctl.stop_and_remove_work_directory() subprocess.call(['coverage', 'combine']) subprocess.call(['coverage', 'report']) @@ -302,4 +348,4 @@ def after_feature(context, feature): """ stop all Patronis, remove their data directory and cleanup the keys in etcd """ context.pctl.stop_all() shutil.rmtree(os.path.join(context.pctl.patroni_path, 'data')) - context.etcd_ctl.cleanup_service_tree() + context.dcs_ctl.cleanup_service_tree() diff --git a/features/patroni_api.feature b/features/patroni_api.feature index 3c408dfc..d3b3130d 100644 --- a/features/patroni_api.feature +++ b/features/patroni_api.feature @@ -31,7 +31,7 @@ Scenario: check API requests for the primary-replica pair Then I receive a response code 200 When I issue an empty POST request to http://127.0.0.1:8008/restart Then I receive a response code 200 - And postgres0 is a leader after 5 seconds + And postgres0 role is the primary after 5 seconds When I sleep for 10 seconds Then postgres1 role is the secondary after 15 seconds diff --git a/features/steps/basic_replication.py b/features/steps/basic_replication.py index 364de61f..b59a7639 100644 --- a/features/steps/basic_replication.py +++ b/features/steps/basic_replication.py @@ -41,9 +41,8 @@ def table_is_present_on(context, table_name, pg_name, max_replication_delay): @then('{pg_name:w} role is the {pg_role:w} after {max_promotion_timeout:d} seconds') def check_role(context, pg_name, pg_role, max_promotion_timeout): - if not context.pctl.check_role_has_changed_to(pg_name, pg_role, timeout=int(max_promotion_timeout)): - assert False,\ - "{0} role didn't change to {1} after {2} seconds".format(pg_name, pg_role, max_promotion_timeout) + assert context.pctl.check_role_has_changed_to(pg_name, pg_role, timeout=int(max_promotion_timeout)),\ + "{0} role didn't change to {1} after {2} seconds".format(pg_name, pg_role, max_promotion_timeout) @step('replication works from {master:w} to {replica:w} after {time_limit:d} seconds') diff --git a/features/steps/patroni_api.py b/features/steps/patroni_api.py index 0f802dee..d94086b5 100644 --- a/features/steps/patroni_api.py +++ b/features/steps/patroni_api.py @@ -29,10 +29,9 @@ register_type(url=parse_url, data=parse_data) @then('{name:w} is a leader after {time_limit:d} seconds') def is_a_leader(context, name, time_limit): max_time = time.time() + int(time_limit) - while (context.etcd_ctl.query("leader") != name): + while (context.dcs_ctl.query("leader") != name): time.sleep(1) - if time.time() > max_time: - assert False, "{0} is not a leader in etcd after {1} seconds".format(name, time_limit) + assert time.time() < max_time, "{0} is not a leader in dcs after {1} seconds".format(name, time_limit) @step('I sleep for {value:d} seconds') diff --git a/patroni/__init__.py b/patroni/__init__.py index 9de46c7f..88b5e312 100644 --- a/patroni/__init__.py +++ b/patroni/__init__.py @@ -8,7 +8,7 @@ from patroni.api import RestApiServer from patroni.etcd import Etcd from patroni.ha import Ha from patroni.postgresql import Postgresql -from patroni.utils import setup_signal_handlers, reap_children +from patroni.utils import reap_children, set_ignore_sigterm, setup_signal_handlers from patroni.zookeeper import ZooKeeper from .version import __version__ @@ -91,7 +91,7 @@ def main(): try: patroni.run() except KeyboardInterrupt: - pass + set_ignore_sigterm() finally: patroni.api.shutdown() patroni.postgresql.stop(checkpoint=False) diff --git a/patroni/utils.py b/patroni/utils.py index cb101758..0969d458 100644 --- a/patroni/utils.py +++ b/patroni/utils.py @@ -33,10 +33,14 @@ def calculate_ttl(expiration): return int((expiration - now).total_seconds()) -def sigterm_handler(signo, stack_frame): +def set_ignore_sigterm(value=True): global __ignore_sigterm + __ignore_sigterm = value + + +def sigterm_handler(signo, stack_frame): if not __ignore_sigterm: - __ignore_sigterm = True + set_ignore_sigterm() sys.exit() diff --git a/patroni/zookeeper.py b/patroni/zookeeper.py index 13faa4d3..78993814 100644 --- a/patroni/zookeeper.py +++ b/patroni/zookeeper.py @@ -83,12 +83,9 @@ class ZooKeeper(AbstractDCS): self.exhibitor = ExhibitorEnsembleProvider(exhibitor['hosts'], exhibitor['port'], poll_interval=interval) hosts = self.exhibitor.zookeeper_hosts - self._client = KazooClient(hosts=hosts, - timeout=(config.get('session_timeout') or 30), - command_retry={ - 'deadline': (config.get('reconnect_timeout') or 10), - 'max_delay': 1, - 'max_tries': -1}, + self._client = KazooClient(hosts=hosts, timeout=(config.get('session_timeout') or 30), + command_retry={'deadline': (config.get('reconnect_timeout') or 10), + 'max_delay': 1, 'max_tries': -1}, connection_retry={'max_delay': 1, 'max_tries': -1}) self._client.add_listener(self.session_listener) @@ -96,7 +93,7 @@ class ZooKeeper(AbstractDCS): self._fetch_cluster = True self._last_leader_operation = 0 - self._client.start(None) + self._client.start() def session_listener(self, state): if state in [KazooState.SUSPENDED, KazooState.LOST]: diff --git a/postgres0.yml b/postgres0.yml index 8b77259e..c02429b4 100644 --- a/postgres0.yml +++ b/postgres0.yml @@ -93,6 +93,7 @@ postgresql: max_replication_slots: 10 hot_standby: "on" wal_log_hints: "on" + unix_socket_directories: '.' tags: nofailover: False noloadbalance: False diff --git a/postgres1.yml b/postgres1.yml index c2d84ea0..1b3ef72a 100644 --- a/postgres1.yml +++ b/postgres1.yml @@ -94,6 +94,7 @@ postgresql: max_replication_slots: 10 hot_standby: "on" wal_log_hints: "on" + unix_socket_directories: '.' tags: nofailover: False noloadbalance: False diff --git a/postgres2.yml b/postgres2.yml index 33620823..db0d7c61 100644 --- a/postgres2.yml +++ b/postgres2.yml @@ -94,6 +94,7 @@ postgresql: max_replication_slots: 10 hot_standby: "on" wal_log_hints: "on" + unix_socket_directories: '.' tags: nofailover: False noloadbalance: False diff --git a/setup.py b/setup.py index a8936d7c..2050baff 100644 --- a/setup.py +++ b/setup.py @@ -4,13 +4,12 @@ Setup file for patroni """ -import sys -import os import inspect +import os +import sys -import setuptools from setuptools.command.test import test as TestCommand -from setuptools import setup +from setuptools import find_packages, setup if sys.version_info < (2, 7, 0): sys.stderr.write('FATAL: patroni needs to be run with Python 2.7+\n') @@ -138,7 +137,7 @@ def setup_package(): long_description=read('README.rst'), classifiers=CLASSIFIERS, test_suite='tests', - packages=setuptools.find_packages(exclude=['tests', 'tests.*']), + packages=find_packages(exclude=['tests', 'tests.*']), package_data={MAIN_PACKAGE: ["*.json"]}, install_requires=install_reqs, setup_requires=['flake8'], diff --git a/tests/test_patroni.py b/tests/test_patroni.py index fc806026..501d7cd6 100644 --- a/tests/test_patroni.py +++ b/tests/test_patroni.py @@ -45,7 +45,7 @@ class TestPatroni(unittest.TestCase): self.assertIsInstance(self.p.get_dcs('', {'zookeeper': {'scope': '', 'hosts': ''}}), ZooKeeper) self.assertRaises(Exception, self.p.get_dcs, '', {}) - @patch('time.sleep', Mock(side_effect=SleepException())) + @patch('time.sleep', Mock(side_effect=SleepException)) @patch.object(Etcd, 'delete_leader', Mock()) @patch.object(Client, 'machines') def test_patroni_main(self, mock_machines): @@ -53,7 +53,7 @@ class TestPatroni(unittest.TestCase): sys.argv = ['patroni.py', 'postgres0.yml'] mock_machines.__get__ = Mock(return_value=['http://remotehost:2379']) - with patch.object(Patroni, 'run', Mock(side_effect=SleepException())): + with patch.object(Patroni, 'run', Mock(side_effect=SleepException)): self.assertRaises(SleepException, _main) with patch.object(Patroni, 'run', Mock(side_effect=KeyboardInterrupt())): _main() @@ -66,12 +66,8 @@ class TestPatroni(unittest.TestCase): self.assertRaises(SleepException, _main) del os.environ[Patroni.PATRONI_CONFIG_VARIABLE] - @patch('time.sleep', Mock(side_effect=SleepException())) def test_run(self): - self.p.ha.dcs.watch = Mock(side_effect=SleepException()) - self.assertRaises(SleepException, self.p.run) - - self.p.ha.state_handler.is_leader = Mock(return_value=False) + self.p.ha.dcs.watch = Mock(side_effect=SleepException) self.p.api.start = Mock() self.assertRaises(SleepException, self.p.run) diff --git a/tests/test_utils.py b/tests/test_utils.py index 740fef03..af4cd861 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,7 +2,8 @@ import unittest from mock import Mock, patch from patroni.exceptions import PatroniException -from patroni.utils import Retry, RetryFailedError, reap_children, sigchld_handler, sigterm_handler, sleep +from patroni.utils import reap_children, Retry, RetryFailedError, set_ignore_sigterm,\ + sigchld_handler, sigterm_handler, sleep def time_sleep(_): @@ -12,6 +13,7 @@ def time_sleep(_): class TestUtils(unittest.TestCase): def test_sigterm_handler(self): + set_ignore_sigterm(False) self.assertRaises(SystemExit, sigterm_handler, None, None) @patch('time.sleep', Mock())