From cb3071adfbffb540f24ce67370ffc8599a7207de Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Thu, 6 Jan 2022 10:20:31 +0100 Subject: [PATCH] Annual cleanup (#2159) - Simplify setup.py: remove unneeded features and get rid of deprecation warnings - Compatibility with Python 3.10: handle `threading.Event.isSet()` deprecation - Make sure setup.py could run without `six`: move Patroni class and main function to the `__main__.py`. The `__init__.py` will have only a few functions used by the Patroni class and from the setup.py --- patroni.py | 2 +- patroni/__init__.py | 176 --------------------------------------- patroni/__main__.py | 179 +++++++++++++++++++++++++++++++++++++++- patroni/dcs/__init__.py | 2 +- patroni/dcs/raft.py | 2 +- setup.py | 81 +++++------------- tests/test_patroni.py | 5 +- tests/test_raft.py | 2 +- tests/test_zookeeper.py | 2 +- 9 files changed, 205 insertions(+), 246 deletions(-) diff --git a/patroni.py b/patroni.py index f34ef133..d463dcae 100755 --- a/patroni.py +++ b/patroni.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from patroni import main +from patroni.__main__ import main if __name__ == '__main__': diff --git a/patroni/__init__.py b/patroni/__init__.py index 46aac101..f6a101cc 100644 --- a/patroni/__init__.py +++ b/patroni/__init__.py @@ -1,145 +1,10 @@ -import logging -import os -import signal import sys -import time - -from .daemon import AbstractPatroniDaemon, abstract_main -from .version import __version__ - -logger = logging.getLogger(__name__) PATRONI_ENV_PREFIX = 'PATRONI_' KUBERNETES_ENV_PREFIX = 'KUBERNETES_' MIN_PSYCOPG2 = (2, 5, 4) -class Patroni(AbstractPatroniDaemon): - - def __init__(self, config): - from patroni.api import RestApiServer - from patroni.dcs import get_dcs - from patroni.ha import Ha - from patroni.postgresql import Postgresql - from patroni.request import PatroniRequest - from patroni.watchdog import Watchdog - - super(Patroni, self).__init__(config) - - self.version = __version__ - self.dcs = get_dcs(self.config) - self.watchdog = Watchdog(self.config) - self.load_dynamic_configuration() - - self.postgresql = Postgresql(self.config['postgresql']) - self.api = RestApiServer(self, self.config['restapi']) - self.request = PatroniRequest(self.config, True) - self.ha = Ha(self) - - self.tags = self.get_tags() - self.next_run = time.time() - self.scheduled_restart = {} - - def load_dynamic_configuration(self): - from patroni.exceptions import DCSError - while True: - try: - cluster = self.dcs.get_cluster() - if cluster and cluster.config and cluster.config.data: - if self.config.set_dynamic_configuration(cluster.config): - self.dcs.reload_config(self.config) - self.watchdog.reload_config(self.config) - elif not self.config.dynamic_configuration and 'bootstrap' in self.config: - if self.config.set_dynamic_configuration(self.config['bootstrap']['dcs']): - self.dcs.reload_config(self.config) - break - except DCSError: - logger.warning('Can not get cluster from dcs') - time.sleep(5) - - def get_tags(self): - return {tag: value for tag, value in self.config.get('tags', {}).items() - if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value} - - @property - def nofailover(self): - return bool(self.tags.get('nofailover', False)) - - @property - def nosync(self): - return bool(self.tags.get('nosync', False)) - - def reload_config(self, sighup=False, local=False): - try: - super(Patroni, self).reload_config(sighup, local) - if local: - self.tags = self.get_tags() - self.request.reload_config(self.config) - if local or sighup and self.api.reload_local_certificate(): - self.api.reload_config(self.config['restapi']) - self.watchdog.reload_config(self.config) - self.postgresql.reload_config(self.config['postgresql'], sighup) - self.dcs.reload_config(self.config) - except Exception: - logger.exception('Failed to reload config_file=%s', self.config.config_file) - - @property - def replicatefrom(self): - return self.tags.get('replicatefrom') - - @property - def noloadbalance(self): - return bool(self.tags.get('noloadbalance', False)) - - def schedule_next_run(self): - self.next_run += self.dcs.loop_wait - current_time = time.time() - nap_time = self.next_run - current_time - if nap_time <= 0: - self.next_run = current_time - # Release the GIL so we don't starve anyone waiting on async_executor lock - time.sleep(0.001) - # Warn user that Patroni is not keeping up - logger.warning("Loop time exceeded, rescheduling immediately.") - elif self.ha.watch(nap_time): - self.next_run = time.time() - - def run(self): - self.api.start() - self.next_run = time.time() - super(Patroni, self).run() - - def _run_cycle(self): - logger.info(self.ha.run_cycle()) - - if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \ - and self.config.set_dynamic_configuration(self.dcs.cluster.config): - self.reload_config() - - if self.postgresql.role != 'uninitialized': - self.config.save_cache() - - self.schedule_next_run() - - def _shutdown(self): - try: - self.api.shutdown() - except Exception: - logger.exception('Exception during RestApi.shutdown') - try: - self.ha.shutdown() - except Exception: - logger.exception('Exception during Ha.shutdown') - - -def patroni_main(): - from multiprocessing import freeze_support - from patroni.validator import schema - - freeze_support() - abstract_main(Patroni, schema) - - def fatal(string, *args): sys.stderr.write('FATAL: ' + string.format(*args) + '\n') sys.exit(1) @@ -174,44 +39,3 @@ def check_psycopg(_min_psycopg2=MIN_PSYCOPG2, _parse_version=parse_version): if version_str: error += ', but only psycopg2=={0} is available'.format(version_str) fatal(error) - - -def main(): - if os.getpid() != 1: - check_psycopg() - return patroni_main() - - # Patroni started with PID=1, it looks like we are in the container - pid = 0 - - # Looks like we are in a docker, so we will act like init - def sigchld_handler(signo, stack_frame): - try: - while True: - ret = os.waitpid(-1, os.WNOHANG) - if ret == (0, 0): - break - elif ret[0] != pid: - logger.info('Reaped pid=%s, exit status=%s', *ret) - except OSError: - pass - - def passtochild(signo, stack_frame): - if pid: - os.kill(pid, signo) - - if os.name != 'nt': - signal.signal(signal.SIGCHLD, sigchld_handler) - signal.signal(signal.SIGHUP, passtochild) - signal.signal(signal.SIGQUIT, passtochild) - signal.signal(signal.SIGUSR1, passtochild) - signal.signal(signal.SIGUSR2, passtochild) - signal.signal(signal.SIGINT, passtochild) - signal.signal(signal.SIGABRT, passtochild) - signal.signal(signal.SIGTERM, passtochild) - - import multiprocessing - patroni = multiprocessing.Process(target=patroni_main) - patroni.start() - pid = patroni.pid - patroni.join() diff --git a/patroni/__main__.py b/patroni/__main__.py index 3abcbfc3..bf800eff 100644 --- a/patroni/__main__.py +++ b/patroni/__main__.py @@ -1,4 +1,181 @@ -from patroni import main +import logging +import os +import signal +import time + +from .daemon import AbstractPatroniDaemon, abstract_main + +logger = logging.getLogger(__name__) + + +class Patroni(AbstractPatroniDaemon): + + def __init__(self, config): + from .api import RestApiServer + from .dcs import get_dcs + from .ha import Ha + from .postgresql import Postgresql + from .request import PatroniRequest + from .version import __version__ + from .watchdog import Watchdog + + super(Patroni, self).__init__(config) + + self.version = __version__ + self.dcs = get_dcs(self.config) + self.watchdog = Watchdog(self.config) + self.load_dynamic_configuration() + + self.postgresql = Postgresql(self.config['postgresql']) + self.api = RestApiServer(self, self.config['restapi']) + self.request = PatroniRequest(self.config, True) + self.ha = Ha(self) + + self.tags = self.get_tags() + self.next_run = time.time() + self.scheduled_restart = {} + + def load_dynamic_configuration(self): + from patroni.exceptions import DCSError + while True: + try: + cluster = self.dcs.get_cluster() + if cluster and cluster.config and cluster.config.data: + if self.config.set_dynamic_configuration(cluster.config): + self.dcs.reload_config(self.config) + self.watchdog.reload_config(self.config) + elif not self.config.dynamic_configuration and 'bootstrap' in self.config: + if self.config.set_dynamic_configuration(self.config['bootstrap']['dcs']): + self.dcs.reload_config(self.config) + break + except DCSError: + logger.warning('Can not get cluster from dcs') + time.sleep(5) + + def get_tags(self): + return {tag: value for tag, value in self.config.get('tags', {}).items() + if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value} + + @property + def nofailover(self): + return bool(self.tags.get('nofailover', False)) + + @property + def nosync(self): + return bool(self.tags.get('nosync', False)) + + def reload_config(self, sighup=False, local=False): + try: + super(Patroni, self).reload_config(sighup, local) + if local: + self.tags = self.get_tags() + self.request.reload_config(self.config) + if local or sighup and self.api.reload_local_certificate(): + self.api.reload_config(self.config['restapi']) + self.watchdog.reload_config(self.config) + self.postgresql.reload_config(self.config['postgresql'], sighup) + self.dcs.reload_config(self.config) + except Exception: + logger.exception('Failed to reload config_file=%s', self.config.config_file) + + @property + def replicatefrom(self): + return self.tags.get('replicatefrom') + + @property + def noloadbalance(self): + return bool(self.tags.get('noloadbalance', False)) + + def schedule_next_run(self): + self.next_run += self.dcs.loop_wait + current_time = time.time() + nap_time = self.next_run - current_time + if nap_time <= 0: + self.next_run = current_time + # Release the GIL so we don't starve anyone waiting on async_executor lock + time.sleep(0.001) + # Warn user that Patroni is not keeping up + logger.warning("Loop time exceeded, rescheduling immediately.") + elif self.ha.watch(nap_time): + self.next_run = time.time() + + def run(self): + self.api.start() + self.next_run = time.time() + super(Patroni, self).run() + + def _run_cycle(self): + logger.info(self.ha.run_cycle()) + + if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \ + and self.config.set_dynamic_configuration(self.dcs.cluster.config): + self.reload_config() + + if self.postgresql.role != 'uninitialized': + self.config.save_cache() + + self.schedule_next_run() + + def _shutdown(self): + try: + self.api.shutdown() + except Exception: + logger.exception('Exception during RestApi.shutdown') + try: + self.ha.shutdown() + except Exception: + logger.exception('Exception during Ha.shutdown') + + +def patroni_main(): + from multiprocessing import freeze_support + from patroni.validator import schema + + freeze_support() + abstract_main(Patroni, schema) + + +def main(): + if os.getpid() != 1: + from . import check_psycopg + + check_psycopg() + return patroni_main() + + # Patroni started with PID=1, it looks like we are in the container + pid = 0 + + # Looks like we are in a docker, so we will act like init + def sigchld_handler(signo, stack_frame): + try: + while True: + ret = os.waitpid(-1, os.WNOHANG) + if ret == (0, 0): + break + elif ret[0] != pid: + logger.info('Reaped pid=%s, exit status=%s', *ret) + except OSError: + pass + + def passtochild(signo, stack_frame): + if pid: + os.kill(pid, signo) + + if os.name != 'nt': + signal.signal(signal.SIGCHLD, sigchld_handler) + signal.signal(signal.SIGHUP, passtochild) + signal.signal(signal.SIGQUIT, passtochild) + signal.signal(signal.SIGUSR1, passtochild) + signal.signal(signal.SIGUSR2, passtochild) + signal.signal(signal.SIGINT, passtochild) + signal.signal(signal.SIGABRT, passtochild) + signal.signal(signal.SIGTERM, passtochild) + + import multiprocessing + patroni = multiprocessing.Process(target=patroni_main) + patroni.start() + pid = patroni.pid + patroni.join() if __name__ == '__main__': diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index cdb4854e..08fa7d8f 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -937,4 +937,4 @@ class AbstractDCS(object): :returns: `!True` if you would like to reschedule the next run of ha cycle""" self.event.wait(timeout) - return self.event.isSet() + return self.event.is_set() diff --git a/patroni/dcs/raft.py b/patroni/dcs/raft.py index fed6d5df..b4ace8c9 100644 --- a/patroni/dcs/raft.py +++ b/patroni/dcs/raft.py @@ -271,7 +271,7 @@ class Raft(AbstractDCS): while True: ready_event.wait(5) - if ready_event.isSet() or self._sync_obj.applied_local_log: + if ready_event.is_set() or self._sync_obj.applied_local_log: break else: logger.info('waiting on raft') diff --git a/setup.py b/setup.py index 1fbf760e..121c8a82 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,6 @@ EXTRAS_REQUIRE = {'aws': ['boto'], 'etcd': ['python-etcd'], 'etcd3': ['python-et 'consul': ['python-consul'], 'exhibitor': ['kazoo'], 'zookeeper': ['kazoo'], 'kubernetes': [], 'raft': ['pysyncobj', 'cryptography']} COVERAGE_XML = True -COVERAGE_HTML = False # Add here all kinds of additional classifiers as defined under # https://pypi.python.org/pypi?%3Aaction=list_classifiers @@ -50,29 +49,29 @@ CLASSIFIERS = [ 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: Implementation :: CPython', ] -CONSOLE_SCRIPTS = ['patroni = patroni:main', +CONSOLE_SCRIPTS = ['patroni = patroni.__main__:main', 'patronictl = patroni.ctl:ctl', 'patroni_raft_controller = patroni.raft_controller:main', "patroni_wale_restore = patroni.scripts.wale_restore:main", "patroni_aws = patroni.scripts.aws:main"] -class Flake8(Command): - +class _Command(Command): user_options = [] def initialize_options(self): - from flake8.main import application - - self.flake8 = application.Application() - self.flake8.initialize([]) + pass def finalize_options(self): pass + +class Flake8(_Command): + def package_files(self): seen_package_directories = () directories = self.distribution.package_dir or {} @@ -94,64 +93,29 @@ class Flake8(Command): return [package for package in self.package_files()] + ['tests', 'setup.py'] def run(self): - self.flake8.run_checks(self.targets()) - self.flake8.formatter.start() - self.flake8.report_errors() - self.flake8.report_statistics() - self.flake8.report_benchmarks() - self.flake8.formatter.stop() - try: - self.flake8.exit() - except SystemExit as e: - # Cause system exit only if exit code is not zero (terminates - # other possibly remaining/pending setuptools commands). - if e.code: - raise + from flake8.main import application + + logging.getLogger().setLevel(logging.ERROR) + flake8 = application.Application() + flake8.run(self.targets()) + flake8.exit() -class PyTest(Command): +class PyTest(_Command): - user_options = [('cov=', None, 'Run coverage'), ('cov-xml=', None, 'Generate junit xml report'), - ('cov-html=', None, 'Generate junit html report')] - - def initialize_options(self): - self.cov = [] - self.cov_xml = False - self.cov_html = False - - def finalize_options(self): - if self.cov_xml or self.cov_html: - self.cov = ['--cov', MAIN_PACKAGE, '--cov-report', 'term-missing'] - if self.cov_xml: - self.cov.extend(['--cov-report', 'xml']) - if self.cov_html: - self.cov.extend(['--cov-report', 'html']) - - def run_tests(self): + def run(self): try: import pytest except Exception: raise RuntimeError('py.test is not installed, run: pip install pytest') args = ['--verbose', 'tests', '--doctest-modules', MAIN_PACKAGE] +\ - ['-s' if logging.getLogger().getEffectiveLevel() < logging.WARNING else '--capture=fd'] - if self.cov: - args += self.cov + ['-s' if logging.getLogger().getEffectiveLevel() < logging.WARNING else '--capture=fd'] +\ + ['--cov', MAIN_PACKAGE, '--cov-report', 'term-missing', '--cov-report', 'xml'] errno = pytest.main(args=args) sys.exit(errno) - def run(self): - from pkg_resources import evaluate_marker - - requirements = set(self.distribution.install_requires + ['mock>=2.0.0', 'pytest-cov', 'pytest']) - for k, v in self.distribution.extras_require.items(): - if not k.startswith(':') or evaluate_marker(k[1:]): - requirements.update(v) - - self.distribution.fetch_build_eggs(list(requirements)) - self.run_tests() - def read(fname): with open(os.path.join(__location__, fname)) as fd: @@ -183,12 +147,6 @@ def setup_package(version): if not extra: install_requires.append(r) - command_options = {'test': {}} - if COVERAGE_XML: - command_options['test']['cov_xml'] = 'setup.py', True - if COVERAGE_HTML: - command_options['test']['cov_html'] = 'setup.py', True - setup( name=NAME, version=version, @@ -205,9 +163,7 @@ def setup_package(version): python_requires='>=2.7', install_requires=install_requires, extras_require=EXTRAS_REQUIRE, - setup_requires='flake8', cmdclass=cmdclass, - command_options=command_options, entry_points={'console_scripts': CONSOLE_SCRIPTS}, ) @@ -215,7 +171,8 @@ def setup_package(version): if __name__ == '__main__': old_modules = sys.modules.copy() try: - from patroni import check_psycopg, fatal, __version__ + from patroni import check_psycopg, fatal + from patroni.version import __version__ finally: sys.modules.clear() sys.modules.update(old_modules) diff --git a/tests/test_patroni.py b/tests/test_patroni.py index 5801085e..74821704 100644 --- a/tests/test_patroni.py +++ b/tests/test_patroni.py @@ -13,7 +13,8 @@ from patroni.dcs.etcd import AbstractEtcdClientWithFailover from patroni.exceptions import DCSError from patroni.postgresql import Postgresql from patroni.postgresql.config import ConfigHandler -from patroni import Patroni, main as _main, patroni_main, check_psycopg +from patroni import check_psycopg +from patroni.__main__ import Patroni, main as _main, patroni_main from six.moves import BaseHTTPServer, builtins from threading import Thread @@ -97,7 +98,7 @@ class TestPatroni(unittest.TestCase): @patch('os.getpid') @patch('multiprocessing.Process') - @patch('patroni.patroni_main', Mock()) + @patch('patroni.__main__.patroni_main', Mock()) def test_patroni_main(self, mock_process, mock_getpid): mock_getpid.return_value = 2 _main() diff --git a/tests/test_raft.py b/tests/test_raft.py index a80a668d..08085299 100644 --- a/tests/test_raft.py +++ b/tests/test_raft.py @@ -157,6 +157,6 @@ class TestRaft(unittest.TestCase): @patch('threading.Event') def test_init(self, mock_event, mock_kvstore): mock_kvstore.return_value.applied_local_log = False - mock_event.return_value.isSet.side_effect = [False, True] + mock_event.return_value.is_set.side_effect = [False, True] self.assertIsNotNone(Raft({'ttl': 30, 'scope': 'test', 'name': 'pg', 'patronictl': True, 'self_addr': '1', 'data_dir': self._TMP})) diff --git a/tests/test_zookeeper.py b/tests/test_zookeeper.py index c809ee0f..0f3213c9 100644 --- a/tests/test_zookeeper.py +++ b/tests/test_zookeeper.py @@ -245,7 +245,7 @@ class TestZooKeeper(unittest.TestCase): def test_watch(self): self.zk.watch(None, 0) - self.zk.event.isSet = Mock(return_value=True) + self.zk.event.is_set = Mock(return_value=True) self.zk._fetch_status = False self.zk.watch(None, 0)