mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
Refactor acceptance tests to make them work against ZooKeeper
and make it easier to implement controllers for new DCS, i.e. consul
This commit is contained in:
35
.travis.yml
35
.travis.yml
@@ -4,7 +4,7 @@ addons:
|
||||
postgresql: "9.5"
|
||||
env:
|
||||
global:
|
||||
- BOTO_CONFIG='' ETCDVERSION=2.2.5
|
||||
- BOTO_CONFIG='' ETCDVERSION=2.2.5 ZKVERSION=3.4.6
|
||||
matrix:
|
||||
- TEST_SUITE="python setup.py test"
|
||||
- TEST_SUITE="behave"
|
||||
@@ -13,20 +13,31 @@ python:
|
||||
- "3.4"
|
||||
- "3.5"
|
||||
install:
|
||||
- sudo /etc/init.d/postgresql stop
|
||||
- sudo apt-get -y remove --purge postgresql-9.1 postgresql-9.2 postgresql-9.3 postgresql-9.4
|
||||
- sudo apt-get -y autoremove
|
||||
- sudo apt-key adv --keyserver keys.gnupg.net --recv-keys 7FCC7D46ACCC4CF8
|
||||
- sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt/ precise-pgdg main 9.5" >> /etc/apt/sources.list.d/postgresql.list'
|
||||
- sudo apt-get update
|
||||
- sudo apt-get -y install postgresql-9.5
|
||||
- sudo /etc/init.d/postgresql stop
|
||||
- |
|
||||
if [[ $TEST_SUITE == "behave" ]]; then
|
||||
sudo bash -c '
|
||||
/etc/init.d/postgresql stop
|
||||
apt-get -y remove --purge postgresql-9.1 postgresql-9.2 postgresql-9.3 postgresql-9.4
|
||||
apt-get -y autoremove
|
||||
apt-key adv --keyserver keys.gnupg.net --recv-keys 7FCC7D46ACCC4CF8
|
||||
echo "deb http://apt.postgresql.org/pub/repos/apt/ precise-pgdg main 9.5" >> /etc/apt/sources.list.d/postgresql.list
|
||||
apt-get update
|
||||
apt-get -y install postgresql-9.5
|
||||
/etc/init.d/postgresql stop'
|
||||
curl -L https://github.com/coreos/etcd/releases/download/v${ETCDVERSION}/etcd-v${ETCDVERSION}-linux-amd64.tar.gz | tar xz -C . --strip=1 --wildcards --no-anchored etcd
|
||||
curl -L http://www.apache.org/dist/zookeeper/zookeeper-${ZKVERSION}/zookeeper-${ZKVERSION}.tar.gz | tar xz
|
||||
mv zookeeper-${ZKVERSION}/conf/zoo_sample.cfg zookeeper-${ZKVERSION}/conf/zoo.cfg
|
||||
zookeeper-${ZKVERSION}/bin/zkServer.sh start
|
||||
while true; do
|
||||
echo -e 'HTTP/1.0 200 OK\nContent-Type: application/json\n\n{"servers":["127.0.0.1"],"port":2181}' \
|
||||
| nc -l 8181 &> /dev/null
|
||||
done&
|
||||
fi
|
||||
- pip install -r requirements.txt
|
||||
- curl -L https://github.com/coreos/etcd/releases/download/v${ETCDVERSION}/etcd-v${ETCDVERSION}-linux-amd64.tar.gz | tar xz -C . --strip=1 --wildcards --no-anchored etcd
|
||||
- pip install behave codacy-coverage coverage coveralls
|
||||
- pip install behave codacy-coverage coverage coveralls
|
||||
script:
|
||||
- PATH=.:$PATH $TEST_SUITE
|
||||
- python setup.py flake8
|
||||
- if [[ $TEST_SUITE == "behave" ]]; then PATH=.:$PATH DCS=exhibitor $TEST_SUITE; else python setup.py flake8; fi
|
||||
after_success:
|
||||
- coveralls
|
||||
- if [[ -f coverage.xml ]]; then python-codacy-coverage -r coverage.xml; fi
|
||||
|
||||
@@ -14,4 +14,4 @@ Feature: basic replication
|
||||
When I start postgres0
|
||||
Then postgres0 role is the secondary after 15 seconds
|
||||
When I add the table bar to postgres1
|
||||
Then table bar is present on postgres0 after 10 seconds
|
||||
Then table bar is present on postgres0 after 15 seconds
|
||||
|
||||
@@ -1,294 +1,340 @@
|
||||
import abc
|
||||
import etcd
|
||||
import kazoo.client
|
||||
import kazoo.exceptions
|
||||
import os
|
||||
import psycopg2
|
||||
import requests
|
||||
import shutil
|
||||
import six
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import yaml
|
||||
|
||||
|
||||
class PatroniController(object):
|
||||
PATRONI_CONFIG = '{}.yml'
|
||||
""" starts and stops individual patronis"""
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AbstractController(object):
|
||||
|
||||
def __init__(self):
|
||||
self._output_dir = None
|
||||
self._patroni_path = None
|
||||
self._connections = {}
|
||||
self._config = {}
|
||||
self._connstring = {}
|
||||
self._cursors = {}
|
||||
self._log = {}
|
||||
self._processes = {}
|
||||
def __init__(self, name, work_directory, output_dir):
|
||||
self._name = name
|
||||
self._work_directory = work_directory
|
||||
self._output_dir = output_dir
|
||||
self._handle = None
|
||||
self._log = None
|
||||
|
||||
@property
|
||||
def patroni_path(self):
|
||||
if self._patroni_path is None:
|
||||
cwd = os.path.realpath(__file__)
|
||||
while True:
|
||||
path, entry = os.path.split(cwd)
|
||||
cwd = path
|
||||
if entry == 'features' or cwd == '/':
|
||||
break
|
||||
self._patroni_path = cwd
|
||||
return self._patroni_path
|
||||
def _has_started(self):
|
||||
return self._handle and self._handle.pid and self._handle.poll() is None
|
||||
|
||||
def data_dir(self, pg_name):
|
||||
return os.path.join(self.patroni_path, 'data', pg_name)
|
||||
def _is_running(self):
|
||||
return self._has_started()
|
||||
|
||||
def write_label(self, pg_name, content):
|
||||
with open(os.path.join(self.data_dir(pg_name), 'label'), 'w') as f:
|
||||
f.write(content)
|
||||
@abc.abstractmethod
|
||||
def _is_accessible(self):
|
||||
"""process is accessible for queries"""
|
||||
|
||||
def read_label(self, pg_name):
|
||||
content = None
|
||||
try:
|
||||
with open(os.path.join(self.data_dir(pg_name), 'label'), 'r') as f:
|
||||
content = f.read()
|
||||
except IOError:
|
||||
return None
|
||||
return content.strip()
|
||||
@abc.abstractmethod
|
||||
def _start(self):
|
||||
"""start process"""
|
||||
|
||||
def start(self, pg_name, max_wait_limit=20, tags=None):
|
||||
if not self._is_running(pg_name):
|
||||
if pg_name in self._processes:
|
||||
del self._processes[pg_name]
|
||||
cwd = self.patroni_path
|
||||
self._log[pg_name] = open(os.path.join(self._output_dir, 'patroni_{0}.log'.format(pg_name)), 'a')
|
||||
def start(self, max_wait_limit=5):
|
||||
if self._is_running():
|
||||
return True
|
||||
|
||||
self._config[pg_name] = self._make_patroni_test_config(pg_name, tags=tags)
|
||||
self._log = open(os.path.join(self._output_dir, self._name + '.log'), 'a')
|
||||
self._handle = self._start()
|
||||
|
||||
assert self._has_started(), "Process {0} is not running after being started".format(self._name)
|
||||
|
||||
p = subprocess.Popen(['coverage', 'run', '--branch', '--source=patroni', '-p', 'patroni.py', self._config[pg_name]],
|
||||
stdout=self._log[pg_name], stderr=subprocess.STDOUT, cwd=cwd)
|
||||
if not (p and p.pid and p.poll() is None):
|
||||
assert False, "PostgreSQL {0} is not running after being started".format(pg_name)
|
||||
self._processes[pg_name] = p
|
||||
# wait while patroni is available for queries, but not more than 10 seconds.
|
||||
for _ in range(max_wait_limit):
|
||||
if self.query(pg_name, "SELECT 1", fail_ok=True) is not None:
|
||||
if self._is_accessible():
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
assert False,\
|
||||
"Patroni instance is not available for queries after {0} seconds".format(max_wait_limit)
|
||||
"{0} instance is not available for queries after {1} seconds".format(self._name, max_wait_limit)
|
||||
|
||||
def stop(self, pg_name, kill=False, timeout=15):
|
||||
def stop(self, kill=False, timeout=15):
|
||||
term = False
|
||||
start_time = time.time()
|
||||
while self._is_running(pg_name):
|
||||
if not kill:
|
||||
self._processes[pg_name].terminate()
|
||||
else:
|
||||
self._processes[pg_name].kill()
|
||||
|
||||
while self._handle and self._is_running():
|
||||
if kill:
|
||||
self._handle.kill()
|
||||
elif not term:
|
||||
self._handle.terminate()
|
||||
term = True
|
||||
time.sleep(1)
|
||||
if not kill and time.time() - start_time > timeout:
|
||||
kill = True
|
||||
if self._log.get('pg_name') and not self._log['pg_name'].closed:
|
||||
self._log[pg_name].close()
|
||||
if pg_name in self._processes:
|
||||
del self._processes[pg_name]
|
||||
|
||||
def query(self, pg_name, query, fail_ok=False):
|
||||
if self._log:
|
||||
self._log.close()
|
||||
|
||||
|
||||
class PatroniController(AbstractController):
|
||||
PATRONI_CONFIG = '{}.yml'
|
||||
""" starts and stops individual patronis"""
|
||||
|
||||
def __init__(self, dcs, name, work_directory, output_dir, tags=None):
|
||||
super(PatroniController, self).__init__('patroni_' + name, work_directory, output_dir)
|
||||
self._data_dir = os.path.join(work_directory, 'data', name)
|
||||
self._connstring = None
|
||||
self._config = self._make_patroni_test_config(name, dcs, tags)
|
||||
|
||||
self._conn = None
|
||||
self._curs = None
|
||||
|
||||
def write_label(self, content):
|
||||
with open(os.path.join(self._data_dir, 'label'), 'w') as f:
|
||||
f.write(content)
|
||||
|
||||
def read_label(self):
|
||||
try:
|
||||
cursor = self._cursor(pg_name)
|
||||
cursor.execute(query)
|
||||
return cursor
|
||||
except psycopg2.Error:
|
||||
if fail_ok:
|
||||
return None
|
||||
else:
|
||||
raise
|
||||
with open(os.path.join(self._data_dir, 'label'), 'r') as f:
|
||||
return f.read().strip()
|
||||
except IOError:
|
||||
return None
|
||||
|
||||
def check_role_has_changed_to(self, pg_name, new_role, timeout=10):
|
||||
bound_time = time.time() + timeout
|
||||
recovery_status = False if new_role == 'primary' else True
|
||||
role_has_changed = False
|
||||
while not role_has_changed:
|
||||
cur = self.query(pg_name, "SELECT pg_is_in_recovery()", fail_ok=True)
|
||||
if cur:
|
||||
row = cur.fetchone()
|
||||
if row and len(row) > 0 and row[0] == recovery_status:
|
||||
role_has_changed = True
|
||||
if time.time() > bound_time:
|
||||
break
|
||||
time.sleep(1)
|
||||
return role_has_changed
|
||||
def _start(self):
|
||||
return subprocess.Popen(['coverage', 'run', '--source=patroni', '-p', 'patroni.py', self._config],
|
||||
stdout=self._log, stderr=subprocess.STDOUT, cwd=self._work_directory)
|
||||
|
||||
def stop_all(self):
|
||||
for patroni in self._processes.copy():
|
||||
self.stop(patroni)
|
||||
def _is_accessible(self):
|
||||
return self.query("SELECT 1", fail_ok=True) is not None
|
||||
|
||||
def create_and_set_output_directory(self, feature_name):
|
||||
feature_dir = os.path.join(self.patroni_path, "features", "output",
|
||||
feature_name.replace(' ', '_'))
|
||||
if os.path.exists(feature_dir):
|
||||
shutil.rmtree(feature_dir)
|
||||
os.makedirs(feature_dir)
|
||||
self._output_dir = feature_dir
|
||||
|
||||
def _is_running(self, pg_name):
|
||||
return pg_name in self._processes and self._processes[pg_name].pid and (self._processes[pg_name].poll() is None)
|
||||
|
||||
def _make_patroni_test_config(self, pg_name, tags=None):
|
||||
patroni_config_name = PatroniController.PATRONI_CONFIG.format(pg_name)
|
||||
def _make_patroni_test_config(self, name, dcs, tags):
|
||||
patroni_config_name = self.PATRONI_CONFIG.format(name)
|
||||
patroni_config_path = os.path.join(self._output_dir, patroni_config_name)
|
||||
|
||||
with open(patroni_config_name) as f:
|
||||
config = yaml.load(f)
|
||||
postgresql = config['postgresql']
|
||||
postgresql['name'] = pg_name
|
||||
postgresql['data_dir'] = 'data/{0}'.format(pg_name)
|
||||
postgresql_params = postgresql['parameters']
|
||||
postgresql_params['logging_collector'] = 'on'
|
||||
postgresql_params['log_destination'] = 'csvlog'
|
||||
postgresql_params['log_directory'] = self._output_dir
|
||||
postgresql_params['log_filename'] = '{0}.log'.format(pg_name)
|
||||
postgresql_params['log_statement'] = 'all'
|
||||
postgresql_params['log_min_messages'] = 'debug1'
|
||||
postgresql_params['unix_socket_directories'] = '.'
|
||||
|
||||
self._connstring = self._make_connstring(config)
|
||||
|
||||
config['postgresql'].update({'name': name, 'data_dir': self._data_dir})
|
||||
config['postgresql']['parameters'].update({
|
||||
'logging_collector': 'on', 'log_destination': 'csvlog', 'log_directory': self._output_dir,
|
||||
'log_filename': name + '.log', 'log_statement': 'all', 'log_min_messages': 'debug1'})
|
||||
|
||||
if tags:
|
||||
config['tags'] = tags
|
||||
|
||||
if dcs != 'etcd':
|
||||
etcd = config.pop('etcd')
|
||||
config['zookeeper'] = {'scope': etcd['scope'], 'session_timeout': etcd['ttl'],
|
||||
'reconnect_timeout': config['loop_wait']}
|
||||
if dcs == 'exhibitor':
|
||||
config['zookeeper']['exhibitor'] = {'hosts': ['127.0.0.1'], 'port': 8181}
|
||||
else:
|
||||
config['zookeeper']['hosts'] = ['127.0.0.1:2181']
|
||||
|
||||
with open(patroni_config_path, 'w') as f:
|
||||
yaml.dump(config, f, default_flow_style=False)
|
||||
|
||||
return patroni_config_path
|
||||
|
||||
def _make_connstring(self, pg_name):
|
||||
if pg_name in self._connstring:
|
||||
return self._connstring[pg_name]
|
||||
try:
|
||||
patroni_path = self.patroni_path
|
||||
with open(os.path.join(patroni_path, PatroniController.PATRONI_CONFIG.format(pg_name)), 'r') as f:
|
||||
config = yaml.load(f)
|
||||
except IOError:
|
||||
return None
|
||||
connstring = config['postgresql']['connect_address']
|
||||
if ':' in connstring:
|
||||
address, port = connstring.split(':')
|
||||
else:
|
||||
address = connstring
|
||||
port = '5432'
|
||||
user = "postgres"
|
||||
dbname = "postgres"
|
||||
self._connstring[pg_name] = "host={0} port={1} dbname={2} user={3}".format(address, port, dbname, user)
|
||||
return self._connstring[pg_name]
|
||||
|
||||
def _connection(self, pg_name):
|
||||
if pg_name not in self._connections or self._connections[pg_name].closed:
|
||||
conn = psycopg2.connect(self._make_connstring(pg_name))
|
||||
conn.autocommit = True
|
||||
self._connections[pg_name] = conn
|
||||
return self._connections[pg_name]
|
||||
|
||||
def _cursor(self, pg_name):
|
||||
if pg_name not in self._cursors or self._cursors[pg_name].closed:
|
||||
cursor = self._connection(pg_name).cursor()
|
||||
self._cursors[pg_name] = cursor
|
||||
return self._cursors[pg_name]
|
||||
|
||||
|
||||
class EtcdController(object):
|
||||
|
||||
""" handles all etcd related tasks, used for the tests setup and cleanup """
|
||||
ETCD_VERSION_URL = 'http://127.0.0.1:2379/version'
|
||||
ETCD_CLEANUP_URL = 'http://127.0.0.1:2379/v2/keys/service/batman?recursive=true'
|
||||
|
||||
def __init__(self, log_directory):
|
||||
self.handle = None
|
||||
self.work_directory = None
|
||||
self.log_directory = log_directory
|
||||
self.log_file = None
|
||||
self.pid = None
|
||||
self.start_timeout = 5
|
||||
|
||||
def start(self):
|
||||
""" start etcd if it's not already running """
|
||||
if self._is_running():
|
||||
return True
|
||||
self.work_directory = tempfile.mkdtemp()
|
||||
# etcd is running throughout the tests, no need to append to the log
|
||||
output_dir = os.path.join(self.log_directory, "features", "output")
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
self.log_file = open(os.path.join(output_dir, 'etcd.log'), 'w')
|
||||
self.handle =\
|
||||
subprocess.Popen(["etcd", "--debug", "--data-dir", self.work_directory],
|
||||
stdout=self.log_file, stderr=subprocess.STDOUT)
|
||||
start_time = time.time()
|
||||
while (not self._is_running()):
|
||||
if time.time() - start_time > self.start_timeout:
|
||||
assert False, "Failed to start etcd"
|
||||
time.sleep(1)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def query(key):
|
||||
""" query etcd for a value of a given key """
|
||||
r = requests.get("http://127.0.0.1:2379/v2/keys/service/batman/{0}".format(key))
|
||||
if r.ok:
|
||||
content = r.json()
|
||||
if content:
|
||||
return content.get('node', {}).get('value')
|
||||
return None
|
||||
def _make_connstring(config):
|
||||
tmp = (config['postgresql']['connect_address'] + ':5432').split(':')
|
||||
return 'host={0} port={1} dbname=postgres user=postgres'.format(*tmp[:2])
|
||||
|
||||
def _connection(self):
|
||||
if not self._conn or self._conn.closed != 0:
|
||||
self._conn = psycopg2.connect(self._connstring)
|
||||
self._conn.autocommit = True
|
||||
return self._conn
|
||||
|
||||
def _cursor(self):
|
||||
if not self._curs or self._curs.closed or self._curs.connection.closed != 0:
|
||||
self._curs = self._connection().cursor()
|
||||
return self._curs
|
||||
|
||||
def query(self, query, fail_ok=False):
|
||||
try:
|
||||
cursor = self._cursor()
|
||||
cursor.execute(query)
|
||||
return cursor
|
||||
except psycopg2.Error:
|
||||
if not fail_ok:
|
||||
raise
|
||||
|
||||
def check_role_has_changed_to(self, new_role, timeout=10):
|
||||
bound_time = time.time() + timeout
|
||||
recovery_status = new_role != 'primary'
|
||||
while time.time() < bound_time:
|
||||
cur = self.query("SELECT pg_is_in_recovery()", fail_ok=True)
|
||||
if cur:
|
||||
row = cur.fetchone()
|
||||
if row and row[0] == recovery_status:
|
||||
return True
|
||||
time.sleep(1)
|
||||
return False
|
||||
|
||||
|
||||
class AbstractDcsController(AbstractController):
|
||||
|
||||
_CLUSTER_NODE = 'service/batman'
|
||||
|
||||
def _is_accessible(self):
|
||||
return self._is_running()
|
||||
|
||||
def stop_and_remove_work_directory(self, timeout=15):
|
||||
""" terminate etcd and wipe out the temp work directory, but only if we actually started it"""
|
||||
kill = False
|
||||
start_time = time.time()
|
||||
while self._is_running() and self.handle:
|
||||
if not kill:
|
||||
self.handle.terminate()
|
||||
else:
|
||||
self.handle.kill()
|
||||
time.sleep(1)
|
||||
if not kill and time.time() - start_time > timeout:
|
||||
kill = True
|
||||
self.handle = None
|
||||
if self.log_file and not self.log_file.closed:
|
||||
self.log_file.close()
|
||||
if self.work_directory:
|
||||
shutil.rmtree(self.work_directory)
|
||||
self.work_directory = None
|
||||
""" terminate process and wipe out the temp work directory, but only if we actually started it"""
|
||||
self.stop(timeout=timeout)
|
||||
if self._work_directory:
|
||||
shutil.rmtree(self._work_directory)
|
||||
|
||||
@staticmethod
|
||||
def cleanup_service_tree():
|
||||
@abc.abstractmethod
|
||||
def query(self, key):
|
||||
""" query for a value of a given key """
|
||||
|
||||
@abc.abstractmethod
|
||||
def cleanup_service_tree(self):
|
||||
""" clean all contents stored in the tree used for the tests """
|
||||
r = None
|
||||
|
||||
|
||||
class EtcdController(AbstractDcsController):
|
||||
|
||||
""" handles all etcd related tasks, used for the tests setup and cleanup """
|
||||
|
||||
def __init__(self, output_dir):
|
||||
super(EtcdController, self).__init__('etcd', tempfile.mkdtemp(), output_dir)
|
||||
self._client = etcd.Client()
|
||||
|
||||
def _start(self):
|
||||
return subprocess.Popen(["etcd", "--debug", "--data-dir", self._work_directory],
|
||||
stdout=self._log, stderr=subprocess.STDOUT)
|
||||
|
||||
def query(self, key):
|
||||
try:
|
||||
r = requests.delete(EtcdController.ETCD_CLEANUP_URL)
|
||||
if r and not r.ok:
|
||||
assert False,\
|
||||
"request to cleanup the etcd contents was not successfull: status code {0}".format(r.status_code)
|
||||
except requests.exceptions.RequestException as e:
|
||||
return self._client.get('/{0}/{1}'.format(self._CLUSTER_NODE, key)).value
|
||||
except etcd.EtcdKeyNotFound:
|
||||
return None
|
||||
|
||||
def cleanup_service_tree(self):
|
||||
try:
|
||||
self._client.delete('/' + self._CLUSTER_NODE, recursive=True)
|
||||
except (etcd.EtcdKeyNotFound, etcd.EtcdConnectionFailed):
|
||||
return
|
||||
except Exception as e:
|
||||
assert False, "exception when cleaning up etcd contents: {0}".format(e)
|
||||
|
||||
@staticmethod
|
||||
def _is_running():
|
||||
def _is_running(self):
|
||||
# if etcd is running, but we didn't start it
|
||||
try:
|
||||
r = requests.get(EtcdController.ETCD_VERSION_URL)
|
||||
running = (r and r.ok and b'etcdserver' in r.content)
|
||||
except requests.ConnectionError:
|
||||
running = False
|
||||
return running
|
||||
return bool(self._client.machines)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class ZooKeeperController(AbstractDcsController):
|
||||
|
||||
""" handles all zookeeper related tasks, used for the tests setup and cleanup """
|
||||
|
||||
def __init__(self, output_dir):
|
||||
super(ZooKeeperController, self).__init__('zookeeper', None, output_dir)
|
||||
self._client = kazoo.client.KazooClient()
|
||||
|
||||
def _start(self):
|
||||
pass # TODO: implement later
|
||||
|
||||
def query(self, key):
|
||||
try:
|
||||
return self._client.get('/{0}/{1}'.format(self._CLUSTER_NODE, key))[0].decode('utf-8')
|
||||
except kazoo.exceptions.NoNodeError:
|
||||
return None
|
||||
|
||||
def cleanup_service_tree(self):
|
||||
try:
|
||||
self._client.delete('/' + self._CLUSTER_NODE, recursive=True)
|
||||
except (kazoo.exceptions.NoNodeError):
|
||||
return
|
||||
except Exception as e:
|
||||
assert False, "exception when cleaning up zookeeper contents: {0}".format(e)
|
||||
|
||||
def _is_running(self):
|
||||
# if zookeeper is running, but we didn't start it
|
||||
if self._client.connected:
|
||||
return True
|
||||
try:
|
||||
return self._client.start(1) or True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class PatroniPoolController(object):
|
||||
|
||||
KNOWN_DCS = {'etcd': EtcdController, 'zookeeper': ZooKeeperController, 'exhibitor': ZooKeeperController}
|
||||
|
||||
def __init__(self):
|
||||
self._dcs = None
|
||||
self._output_dir = None
|
||||
self._patroni_path = None
|
||||
self._processes = {}
|
||||
self.create_and_set_output_directory('')
|
||||
|
||||
@property
|
||||
def patroni_path(self):
|
||||
if self._patroni_path is None:
|
||||
cwd = os.path.realpath(__file__)
|
||||
while True:
|
||||
cwd, entry = os.path.split(cwd)
|
||||
if entry == 'features' or cwd == '/':
|
||||
break
|
||||
self._patroni_path = cwd
|
||||
return self._patroni_path
|
||||
|
||||
@property
|
||||
def output_dir(self):
|
||||
return self._output_dir
|
||||
|
||||
def start(self, pg_name, max_wait_limit=20, tags=None):
|
||||
if pg_name not in self._processes:
|
||||
self._processes[pg_name] = PatroniController(self.dcs, pg_name, self.patroni_path, self._output_dir, tags)
|
||||
self._processes[pg_name].start(max_wait_limit)
|
||||
|
||||
def __getattr__(self, func):
|
||||
if func not in ['stop', 'query', 'write_label', 'read_label', 'check_role_has_changed_to']:
|
||||
raise AttributeError("PatroniPoolController instance has no attribute '{0}'".format(func))
|
||||
|
||||
def wrapper(pg_name, *args, **kwargs):
|
||||
return getattr(self._processes[pg_name], func)(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
def stop_all(self):
|
||||
for ctl in self._processes.values():
|
||||
ctl.stop()
|
||||
self._processes.clear()
|
||||
|
||||
def create_and_set_output_directory(self, feature_name):
|
||||
feature_dir = os.path.join(self.patroni_path, 'features/output', feature_name.replace(' ', '_'))
|
||||
if os.path.exists(feature_dir):
|
||||
shutil.rmtree(feature_dir)
|
||||
os.makedirs(feature_dir)
|
||||
self._output_dir = feature_dir
|
||||
|
||||
@property
|
||||
def dcs(self):
|
||||
if self._dcs is None:
|
||||
self._dcs = os.environ.get('DCS', 'etcd')
|
||||
assert self._dcs in self.KNOWN_DCS, 'Unsupported dcs: ' + self.dcs
|
||||
return self._dcs
|
||||
|
||||
|
||||
# actions to execute on start/stop of the tests and before running invidual features
|
||||
def before_all(context):
|
||||
context.pctl = PatroniController()
|
||||
context.etcd_ctl = EtcdController(context.pctl.patroni_path)
|
||||
context.etcd_ctl.start()
|
||||
context.pctl = PatroniPoolController()
|
||||
context.dcs_ctl = context.pctl.KNOWN_DCS[context.pctl.dcs](context.pctl.output_dir)
|
||||
context.dcs_ctl.start()
|
||||
try:
|
||||
context.etcd_ctl.cleanup_service_tree()
|
||||
except AssertionError: # after.all handlers won't be executed in before.all
|
||||
context.etcd_ctl.stop_and_remove_work_directory()
|
||||
context.dcs_ctl.cleanup_service_tree()
|
||||
except AssertionError: # after_all handlers won't be executed in before_all
|
||||
context.dcs_ctl.stop_and_remove_work_directory()
|
||||
raise
|
||||
|
||||
|
||||
def after_all(context):
|
||||
context.etcd_ctl.stop_and_remove_work_directory()
|
||||
context.dcs_ctl.stop_and_remove_work_directory()
|
||||
subprocess.call(['coverage', 'combine'])
|
||||
subprocess.call(['coverage', 'report'])
|
||||
|
||||
@@ -302,4 +348,4 @@ def after_feature(context, feature):
|
||||
""" stop all Patronis, remove their data directory and cleanup the keys in etcd """
|
||||
context.pctl.stop_all()
|
||||
shutil.rmtree(os.path.join(context.pctl.patroni_path, 'data'))
|
||||
context.etcd_ctl.cleanup_service_tree()
|
||||
context.dcs_ctl.cleanup_service_tree()
|
||||
|
||||
@@ -31,7 +31,7 @@ Scenario: check API requests for the primary-replica pair
|
||||
Then I receive a response code 200
|
||||
When I issue an empty POST request to http://127.0.0.1:8008/restart
|
||||
Then I receive a response code 200
|
||||
And postgres0 is a leader after 5 seconds
|
||||
And postgres0 role is the primary after 5 seconds
|
||||
When I sleep for 10 seconds
|
||||
Then postgres1 role is the secondary after 15 seconds
|
||||
|
||||
|
||||
@@ -41,9 +41,8 @@ def table_is_present_on(context, table_name, pg_name, max_replication_delay):
|
||||
|
||||
@then('{pg_name:w} role is the {pg_role:w} after {max_promotion_timeout:d} seconds')
|
||||
def check_role(context, pg_name, pg_role, max_promotion_timeout):
|
||||
if not context.pctl.check_role_has_changed_to(pg_name, pg_role, timeout=int(max_promotion_timeout)):
|
||||
assert False,\
|
||||
"{0} role didn't change to {1} after {2} seconds".format(pg_name, pg_role, max_promotion_timeout)
|
||||
assert context.pctl.check_role_has_changed_to(pg_name, pg_role, timeout=int(max_promotion_timeout)),\
|
||||
"{0} role didn't change to {1} after {2} seconds".format(pg_name, pg_role, max_promotion_timeout)
|
||||
|
||||
|
||||
@step('replication works from {master:w} to {replica:w} after {time_limit:d} seconds')
|
||||
|
||||
@@ -29,10 +29,9 @@ register_type(url=parse_url, data=parse_data)
|
||||
@then('{name:w} is a leader after {time_limit:d} seconds')
|
||||
def is_a_leader(context, name, time_limit):
|
||||
max_time = time.time() + int(time_limit)
|
||||
while (context.etcd_ctl.query("leader") != name):
|
||||
while (context.dcs_ctl.query("leader") != name):
|
||||
time.sleep(1)
|
||||
if time.time() > max_time:
|
||||
assert False, "{0} is not a leader in etcd after {1} seconds".format(name, time_limit)
|
||||
assert time.time() < max_time, "{0} is not a leader in dcs after {1} seconds".format(name, time_limit)
|
||||
|
||||
|
||||
@step('I sleep for {value:d} seconds')
|
||||
|
||||
@@ -8,7 +8,7 @@ from patroni.api import RestApiServer
|
||||
from patroni.etcd import Etcd
|
||||
from patroni.ha import Ha
|
||||
from patroni.postgresql import Postgresql
|
||||
from patroni.utils import setup_signal_handlers, reap_children
|
||||
from patroni.utils import reap_children, set_ignore_sigterm, setup_signal_handlers
|
||||
from patroni.zookeeper import ZooKeeper
|
||||
from .version import __version__
|
||||
|
||||
@@ -91,7 +91,7 @@ def main():
|
||||
try:
|
||||
patroni.run()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
set_ignore_sigterm()
|
||||
finally:
|
||||
patroni.api.shutdown()
|
||||
patroni.postgresql.stop(checkpoint=False)
|
||||
|
||||
@@ -33,10 +33,14 @@ def calculate_ttl(expiration):
|
||||
return int((expiration - now).total_seconds())
|
||||
|
||||
|
||||
def sigterm_handler(signo, stack_frame):
|
||||
def set_ignore_sigterm(value=True):
|
||||
global __ignore_sigterm
|
||||
__ignore_sigterm = value
|
||||
|
||||
|
||||
def sigterm_handler(signo, stack_frame):
|
||||
if not __ignore_sigterm:
|
||||
__ignore_sigterm = True
|
||||
set_ignore_sigterm()
|
||||
sys.exit()
|
||||
|
||||
|
||||
|
||||
@@ -83,12 +83,9 @@ class ZooKeeper(AbstractDCS):
|
||||
self.exhibitor = ExhibitorEnsembleProvider(exhibitor['hosts'], exhibitor['port'], poll_interval=interval)
|
||||
hosts = self.exhibitor.zookeeper_hosts
|
||||
|
||||
self._client = KazooClient(hosts=hosts,
|
||||
timeout=(config.get('session_timeout') or 30),
|
||||
command_retry={
|
||||
'deadline': (config.get('reconnect_timeout') or 10),
|
||||
'max_delay': 1,
|
||||
'max_tries': -1},
|
||||
self._client = KazooClient(hosts=hosts, timeout=(config.get('session_timeout') or 30),
|
||||
command_retry={'deadline': (config.get('reconnect_timeout') or 10),
|
||||
'max_delay': 1, 'max_tries': -1},
|
||||
connection_retry={'max_delay': 1, 'max_tries': -1})
|
||||
self._client.add_listener(self.session_listener)
|
||||
|
||||
@@ -96,7 +93,7 @@ class ZooKeeper(AbstractDCS):
|
||||
self._fetch_cluster = True
|
||||
self._last_leader_operation = 0
|
||||
|
||||
self._client.start(None)
|
||||
self._client.start()
|
||||
|
||||
def session_listener(self, state):
|
||||
if state in [KazooState.SUSPENDED, KazooState.LOST]:
|
||||
|
||||
@@ -93,6 +93,7 @@ postgresql:
|
||||
max_replication_slots: 10
|
||||
hot_standby: "on"
|
||||
wal_log_hints: "on"
|
||||
unix_socket_directories: '.'
|
||||
tags:
|
||||
nofailover: False
|
||||
noloadbalance: False
|
||||
|
||||
@@ -94,6 +94,7 @@ postgresql:
|
||||
max_replication_slots: 10
|
||||
hot_standby: "on"
|
||||
wal_log_hints: "on"
|
||||
unix_socket_directories: '.'
|
||||
tags:
|
||||
nofailover: False
|
||||
noloadbalance: False
|
||||
|
||||
@@ -94,6 +94,7 @@ postgresql:
|
||||
max_replication_slots: 10
|
||||
hot_standby: "on"
|
||||
wal_log_hints: "on"
|
||||
unix_socket_directories: '.'
|
||||
tags:
|
||||
nofailover: False
|
||||
noloadbalance: False
|
||||
|
||||
9
setup.py
9
setup.py
@@ -4,13 +4,12 @@
|
||||
Setup file for patroni
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import inspect
|
||||
import os
|
||||
import sys
|
||||
|
||||
import setuptools
|
||||
from setuptools.command.test import test as TestCommand
|
||||
from setuptools import setup
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
if sys.version_info < (2, 7, 0):
|
||||
sys.stderr.write('FATAL: patroni needs to be run with Python 2.7+\n')
|
||||
@@ -138,7 +137,7 @@ def setup_package():
|
||||
long_description=read('README.rst'),
|
||||
classifiers=CLASSIFIERS,
|
||||
test_suite='tests',
|
||||
packages=setuptools.find_packages(exclude=['tests', 'tests.*']),
|
||||
packages=find_packages(exclude=['tests', 'tests.*']),
|
||||
package_data={MAIN_PACKAGE: ["*.json"]},
|
||||
install_requires=install_reqs,
|
||||
setup_requires=['flake8'],
|
||||
|
||||
@@ -45,7 +45,7 @@ class TestPatroni(unittest.TestCase):
|
||||
self.assertIsInstance(self.p.get_dcs('', {'zookeeper': {'scope': '', 'hosts': ''}}), ZooKeeper)
|
||||
self.assertRaises(Exception, self.p.get_dcs, '', {})
|
||||
|
||||
@patch('time.sleep', Mock(side_effect=SleepException()))
|
||||
@patch('time.sleep', Mock(side_effect=SleepException))
|
||||
@patch.object(Etcd, 'delete_leader', Mock())
|
||||
@patch.object(Client, 'machines')
|
||||
def test_patroni_main(self, mock_machines):
|
||||
@@ -53,7 +53,7 @@ class TestPatroni(unittest.TestCase):
|
||||
sys.argv = ['patroni.py', 'postgres0.yml']
|
||||
|
||||
mock_machines.__get__ = Mock(return_value=['http://remotehost:2379'])
|
||||
with patch.object(Patroni, 'run', Mock(side_effect=SleepException())):
|
||||
with patch.object(Patroni, 'run', Mock(side_effect=SleepException)):
|
||||
self.assertRaises(SleepException, _main)
|
||||
with patch.object(Patroni, 'run', Mock(side_effect=KeyboardInterrupt())):
|
||||
_main()
|
||||
@@ -66,12 +66,8 @@ class TestPatroni(unittest.TestCase):
|
||||
self.assertRaises(SleepException, _main)
|
||||
del os.environ[Patroni.PATRONI_CONFIG_VARIABLE]
|
||||
|
||||
@patch('time.sleep', Mock(side_effect=SleepException()))
|
||||
def test_run(self):
|
||||
self.p.ha.dcs.watch = Mock(side_effect=SleepException())
|
||||
self.assertRaises(SleepException, self.p.run)
|
||||
|
||||
self.p.ha.state_handler.is_leader = Mock(return_value=False)
|
||||
self.p.ha.dcs.watch = Mock(side_effect=SleepException)
|
||||
self.p.api.start = Mock()
|
||||
self.assertRaises(SleepException, self.p.run)
|
||||
|
||||
|
||||
@@ -2,7 +2,8 @@ import unittest
|
||||
|
||||
from mock import Mock, patch
|
||||
from patroni.exceptions import PatroniException
|
||||
from patroni.utils import Retry, RetryFailedError, reap_children, sigchld_handler, sigterm_handler, sleep
|
||||
from patroni.utils import reap_children, Retry, RetryFailedError, set_ignore_sigterm,\
|
||||
sigchld_handler, sigterm_handler, sleep
|
||||
|
||||
|
||||
def time_sleep(_):
|
||||
@@ -12,6 +13,7 @@ def time_sleep(_):
|
||||
class TestUtils(unittest.TestCase):
|
||||
|
||||
def test_sigterm_handler(self):
|
||||
set_ignore_sigterm(False)
|
||||
self.assertRaises(SystemExit, sigterm_handler, None, None)
|
||||
|
||||
@patch('time.sleep', Mock())
|
||||
|
||||
Reference in New Issue
Block a user