Annual cleanup (#2159)

-  Simplify setup.py: remove unneeded features and get rid of deprecation warnings
-  Compatibility with Python 3.10: handle `threading.Event.isSet()` deprecation
-  Make sure setup.py could run without `six`: move Patroni class and main function to the `__main__.py`. The `__init__.py` will have only a few functions used by the Patroni class and from the setup.py
This commit is contained in:
Alexander Kukushkin
2022-01-06 10:20:31 +01:00
committed by GitHub
parent bf354aeebd
commit cb3071adfb
9 changed files with 205 additions and 246 deletions

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python
from patroni import main
from patroni.__main__ import main
if __name__ == '__main__':

View File

@@ -1,145 +1,10 @@
import logging
import os
import signal
import sys
import time
from .daemon import AbstractPatroniDaemon, abstract_main
from .version import __version__
logger = logging.getLogger(__name__)
PATRONI_ENV_PREFIX = 'PATRONI_'
KUBERNETES_ENV_PREFIX = 'KUBERNETES_'
MIN_PSYCOPG2 = (2, 5, 4)
class Patroni(AbstractPatroniDaemon):
def __init__(self, config):
from patroni.api import RestApiServer
from patroni.dcs import get_dcs
from patroni.ha import Ha
from patroni.postgresql import Postgresql
from patroni.request import PatroniRequest
from patroni.watchdog import Watchdog
super(Patroni, self).__init__(config)
self.version = __version__
self.dcs = get_dcs(self.config)
self.watchdog = Watchdog(self.config)
self.load_dynamic_configuration()
self.postgresql = Postgresql(self.config['postgresql'])
self.api = RestApiServer(self, self.config['restapi'])
self.request = PatroniRequest(self.config, True)
self.ha = Ha(self)
self.tags = self.get_tags()
self.next_run = time.time()
self.scheduled_restart = {}
def load_dynamic_configuration(self):
from patroni.exceptions import DCSError
while True:
try:
cluster = self.dcs.get_cluster()
if cluster and cluster.config and cluster.config.data:
if self.config.set_dynamic_configuration(cluster.config):
self.dcs.reload_config(self.config)
self.watchdog.reload_config(self.config)
elif not self.config.dynamic_configuration and 'bootstrap' in self.config:
if self.config.set_dynamic_configuration(self.config['bootstrap']['dcs']):
self.dcs.reload_config(self.config)
break
except DCSError:
logger.warning('Can not get cluster from dcs')
time.sleep(5)
def get_tags(self):
return {tag: value for tag, value in self.config.get('tags', {}).items()
if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value}
@property
def nofailover(self):
return bool(self.tags.get('nofailover', False))
@property
def nosync(self):
return bool(self.tags.get('nosync', False))
def reload_config(self, sighup=False, local=False):
try:
super(Patroni, self).reload_config(sighup, local)
if local:
self.tags = self.get_tags()
self.request.reload_config(self.config)
if local or sighup and self.api.reload_local_certificate():
self.api.reload_config(self.config['restapi'])
self.watchdog.reload_config(self.config)
self.postgresql.reload_config(self.config['postgresql'], sighup)
self.dcs.reload_config(self.config)
except Exception:
logger.exception('Failed to reload config_file=%s', self.config.config_file)
@property
def replicatefrom(self):
return self.tags.get('replicatefrom')
@property
def noloadbalance(self):
return bool(self.tags.get('noloadbalance', False))
def schedule_next_run(self):
self.next_run += self.dcs.loop_wait
current_time = time.time()
nap_time = self.next_run - current_time
if nap_time <= 0:
self.next_run = current_time
# Release the GIL so we don't starve anyone waiting on async_executor lock
time.sleep(0.001)
# Warn user that Patroni is not keeping up
logger.warning("Loop time exceeded, rescheduling immediately.")
elif self.ha.watch(nap_time):
self.next_run = time.time()
def run(self):
self.api.start()
self.next_run = time.time()
super(Patroni, self).run()
def _run_cycle(self):
logger.info(self.ha.run_cycle())
if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \
and self.config.set_dynamic_configuration(self.dcs.cluster.config):
self.reload_config()
if self.postgresql.role != 'uninitialized':
self.config.save_cache()
self.schedule_next_run()
def _shutdown(self):
try:
self.api.shutdown()
except Exception:
logger.exception('Exception during RestApi.shutdown')
try:
self.ha.shutdown()
except Exception:
logger.exception('Exception during Ha.shutdown')
def patroni_main():
from multiprocessing import freeze_support
from patroni.validator import schema
freeze_support()
abstract_main(Patroni, schema)
def fatal(string, *args):
sys.stderr.write('FATAL: ' + string.format(*args) + '\n')
sys.exit(1)
@@ -174,44 +39,3 @@ def check_psycopg(_min_psycopg2=MIN_PSYCOPG2, _parse_version=parse_version):
if version_str:
error += ', but only psycopg2=={0} is available'.format(version_str)
fatal(error)
def main():
if os.getpid() != 1:
check_psycopg()
return patroni_main()
# Patroni started with PID=1, it looks like we are in the container
pid = 0
# Looks like we are in a docker, so we will act like init
def sigchld_handler(signo, stack_frame):
try:
while True:
ret = os.waitpid(-1, os.WNOHANG)
if ret == (0, 0):
break
elif ret[0] != pid:
logger.info('Reaped pid=%s, exit status=%s', *ret)
except OSError:
pass
def passtochild(signo, stack_frame):
if pid:
os.kill(pid, signo)
if os.name != 'nt':
signal.signal(signal.SIGCHLD, sigchld_handler)
signal.signal(signal.SIGHUP, passtochild)
signal.signal(signal.SIGQUIT, passtochild)
signal.signal(signal.SIGUSR1, passtochild)
signal.signal(signal.SIGUSR2, passtochild)
signal.signal(signal.SIGINT, passtochild)
signal.signal(signal.SIGABRT, passtochild)
signal.signal(signal.SIGTERM, passtochild)
import multiprocessing
patroni = multiprocessing.Process(target=patroni_main)
patroni.start()
pid = patroni.pid
patroni.join()

View File

@@ -1,4 +1,181 @@
from patroni import main
import logging
import os
import signal
import time
from .daemon import AbstractPatroniDaemon, abstract_main
logger = logging.getLogger(__name__)
class Patroni(AbstractPatroniDaemon):
def __init__(self, config):
from .api import RestApiServer
from .dcs import get_dcs
from .ha import Ha
from .postgresql import Postgresql
from .request import PatroniRequest
from .version import __version__
from .watchdog import Watchdog
super(Patroni, self).__init__(config)
self.version = __version__
self.dcs = get_dcs(self.config)
self.watchdog = Watchdog(self.config)
self.load_dynamic_configuration()
self.postgresql = Postgresql(self.config['postgresql'])
self.api = RestApiServer(self, self.config['restapi'])
self.request = PatroniRequest(self.config, True)
self.ha = Ha(self)
self.tags = self.get_tags()
self.next_run = time.time()
self.scheduled_restart = {}
def load_dynamic_configuration(self):
from patroni.exceptions import DCSError
while True:
try:
cluster = self.dcs.get_cluster()
if cluster and cluster.config and cluster.config.data:
if self.config.set_dynamic_configuration(cluster.config):
self.dcs.reload_config(self.config)
self.watchdog.reload_config(self.config)
elif not self.config.dynamic_configuration and 'bootstrap' in self.config:
if self.config.set_dynamic_configuration(self.config['bootstrap']['dcs']):
self.dcs.reload_config(self.config)
break
except DCSError:
logger.warning('Can not get cluster from dcs')
time.sleep(5)
def get_tags(self):
return {tag: value for tag, value in self.config.get('tags', {}).items()
if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value}
@property
def nofailover(self):
return bool(self.tags.get('nofailover', False))
@property
def nosync(self):
return bool(self.tags.get('nosync', False))
def reload_config(self, sighup=False, local=False):
try:
super(Patroni, self).reload_config(sighup, local)
if local:
self.tags = self.get_tags()
self.request.reload_config(self.config)
if local or sighup and self.api.reload_local_certificate():
self.api.reload_config(self.config['restapi'])
self.watchdog.reload_config(self.config)
self.postgresql.reload_config(self.config['postgresql'], sighup)
self.dcs.reload_config(self.config)
except Exception:
logger.exception('Failed to reload config_file=%s', self.config.config_file)
@property
def replicatefrom(self):
return self.tags.get('replicatefrom')
@property
def noloadbalance(self):
return bool(self.tags.get('noloadbalance', False))
def schedule_next_run(self):
self.next_run += self.dcs.loop_wait
current_time = time.time()
nap_time = self.next_run - current_time
if nap_time <= 0:
self.next_run = current_time
# Release the GIL so we don't starve anyone waiting on async_executor lock
time.sleep(0.001)
# Warn user that Patroni is not keeping up
logger.warning("Loop time exceeded, rescheduling immediately.")
elif self.ha.watch(nap_time):
self.next_run = time.time()
def run(self):
self.api.start()
self.next_run = time.time()
super(Patroni, self).run()
def _run_cycle(self):
logger.info(self.ha.run_cycle())
if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \
and self.config.set_dynamic_configuration(self.dcs.cluster.config):
self.reload_config()
if self.postgresql.role != 'uninitialized':
self.config.save_cache()
self.schedule_next_run()
def _shutdown(self):
try:
self.api.shutdown()
except Exception:
logger.exception('Exception during RestApi.shutdown')
try:
self.ha.shutdown()
except Exception:
logger.exception('Exception during Ha.shutdown')
def patroni_main():
from multiprocessing import freeze_support
from patroni.validator import schema
freeze_support()
abstract_main(Patroni, schema)
def main():
if os.getpid() != 1:
from . import check_psycopg
check_psycopg()
return patroni_main()
# Patroni started with PID=1, it looks like we are in the container
pid = 0
# Looks like we are in a docker, so we will act like init
def sigchld_handler(signo, stack_frame):
try:
while True:
ret = os.waitpid(-1, os.WNOHANG)
if ret == (0, 0):
break
elif ret[0] != pid:
logger.info('Reaped pid=%s, exit status=%s', *ret)
except OSError:
pass
def passtochild(signo, stack_frame):
if pid:
os.kill(pid, signo)
if os.name != 'nt':
signal.signal(signal.SIGCHLD, sigchld_handler)
signal.signal(signal.SIGHUP, passtochild)
signal.signal(signal.SIGQUIT, passtochild)
signal.signal(signal.SIGUSR1, passtochild)
signal.signal(signal.SIGUSR2, passtochild)
signal.signal(signal.SIGINT, passtochild)
signal.signal(signal.SIGABRT, passtochild)
signal.signal(signal.SIGTERM, passtochild)
import multiprocessing
patroni = multiprocessing.Process(target=patroni_main)
patroni.start()
pid = patroni.pid
patroni.join()
if __name__ == '__main__':

View File

@@ -937,4 +937,4 @@ class AbstractDCS(object):
:returns: `!True` if you would like to reschedule the next run of ha cycle"""
self.event.wait(timeout)
return self.event.isSet()
return self.event.is_set()

View File

@@ -271,7 +271,7 @@ class Raft(AbstractDCS):
while True:
ready_event.wait(5)
if ready_event.isSet() or self._sync_obj.applied_local_log:
if ready_event.is_set() or self._sync_obj.applied_local_log:
break
else:
logger.info('waiting on raft')

View File

@@ -27,7 +27,6 @@ EXTRAS_REQUIRE = {'aws': ['boto'], 'etcd': ['python-etcd'], 'etcd3': ['python-et
'consul': ['python-consul'], 'exhibitor': ['kazoo'], 'zookeeper': ['kazoo'],
'kubernetes': [], 'raft': ['pysyncobj', 'cryptography']}
COVERAGE_XML = True
COVERAGE_HTML = False
# Add here all kinds of additional classifiers as defined under
# https://pypi.python.org/pypi?%3Aaction=list_classifiers
@@ -50,29 +49,29 @@ CLASSIFIERS = [
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: Implementation :: CPython',
]
CONSOLE_SCRIPTS = ['patroni = patroni:main',
CONSOLE_SCRIPTS = ['patroni = patroni.__main__:main',
'patronictl = patroni.ctl:ctl',
'patroni_raft_controller = patroni.raft_controller:main',
"patroni_wale_restore = patroni.scripts.wale_restore:main",
"patroni_aws = patroni.scripts.aws:main"]
class Flake8(Command):
class _Command(Command):
user_options = []
def initialize_options(self):
from flake8.main import application
self.flake8 = application.Application()
self.flake8.initialize([])
pass
def finalize_options(self):
pass
class Flake8(_Command):
def package_files(self):
seen_package_directories = ()
directories = self.distribution.package_dir or {}
@@ -94,64 +93,29 @@ class Flake8(Command):
return [package for package in self.package_files()] + ['tests', 'setup.py']
def run(self):
self.flake8.run_checks(self.targets())
self.flake8.formatter.start()
self.flake8.report_errors()
self.flake8.report_statistics()
self.flake8.report_benchmarks()
self.flake8.formatter.stop()
try:
self.flake8.exit()
except SystemExit as e:
# Cause system exit only if exit code is not zero (terminates
# other possibly remaining/pending setuptools commands).
if e.code:
raise
from flake8.main import application
logging.getLogger().setLevel(logging.ERROR)
flake8 = application.Application()
flake8.run(self.targets())
flake8.exit()
class PyTest(Command):
class PyTest(_Command):
user_options = [('cov=', None, 'Run coverage'), ('cov-xml=', None, 'Generate junit xml report'),
('cov-html=', None, 'Generate junit html report')]
def initialize_options(self):
self.cov = []
self.cov_xml = False
self.cov_html = False
def finalize_options(self):
if self.cov_xml or self.cov_html:
self.cov = ['--cov', MAIN_PACKAGE, '--cov-report', 'term-missing']
if self.cov_xml:
self.cov.extend(['--cov-report', 'xml'])
if self.cov_html:
self.cov.extend(['--cov-report', 'html'])
def run_tests(self):
def run(self):
try:
import pytest
except Exception:
raise RuntimeError('py.test is not installed, run: pip install pytest')
args = ['--verbose', 'tests', '--doctest-modules', MAIN_PACKAGE] +\
['-s' if logging.getLogger().getEffectiveLevel() < logging.WARNING else '--capture=fd']
if self.cov:
args += self.cov
['-s' if logging.getLogger().getEffectiveLevel() < logging.WARNING else '--capture=fd'] +\
['--cov', MAIN_PACKAGE, '--cov-report', 'term-missing', '--cov-report', 'xml']
errno = pytest.main(args=args)
sys.exit(errno)
def run(self):
from pkg_resources import evaluate_marker
requirements = set(self.distribution.install_requires + ['mock>=2.0.0', 'pytest-cov', 'pytest'])
for k, v in self.distribution.extras_require.items():
if not k.startswith(':') or evaluate_marker(k[1:]):
requirements.update(v)
self.distribution.fetch_build_eggs(list(requirements))
self.run_tests()
def read(fname):
with open(os.path.join(__location__, fname)) as fd:
@@ -183,12 +147,6 @@ def setup_package(version):
if not extra:
install_requires.append(r)
command_options = {'test': {}}
if COVERAGE_XML:
command_options['test']['cov_xml'] = 'setup.py', True
if COVERAGE_HTML:
command_options['test']['cov_html'] = 'setup.py', True
setup(
name=NAME,
version=version,
@@ -205,9 +163,7 @@ def setup_package(version):
python_requires='>=2.7',
install_requires=install_requires,
extras_require=EXTRAS_REQUIRE,
setup_requires='flake8',
cmdclass=cmdclass,
command_options=command_options,
entry_points={'console_scripts': CONSOLE_SCRIPTS},
)
@@ -215,7 +171,8 @@ def setup_package(version):
if __name__ == '__main__':
old_modules = sys.modules.copy()
try:
from patroni import check_psycopg, fatal, __version__
from patroni import check_psycopg, fatal
from patroni.version import __version__
finally:
sys.modules.clear()
sys.modules.update(old_modules)

View File

@@ -13,7 +13,8 @@ from patroni.dcs.etcd import AbstractEtcdClientWithFailover
from patroni.exceptions import DCSError
from patroni.postgresql import Postgresql
from patroni.postgresql.config import ConfigHandler
from patroni import Patroni, main as _main, patroni_main, check_psycopg
from patroni import check_psycopg
from patroni.__main__ import Patroni, main as _main, patroni_main
from six.moves import BaseHTTPServer, builtins
from threading import Thread
@@ -97,7 +98,7 @@ class TestPatroni(unittest.TestCase):
@patch('os.getpid')
@patch('multiprocessing.Process')
@patch('patroni.patroni_main', Mock())
@patch('patroni.__main__.patroni_main', Mock())
def test_patroni_main(self, mock_process, mock_getpid):
mock_getpid.return_value = 2
_main()

View File

@@ -157,6 +157,6 @@ class TestRaft(unittest.TestCase):
@patch('threading.Event')
def test_init(self, mock_event, mock_kvstore):
mock_kvstore.return_value.applied_local_log = False
mock_event.return_value.isSet.side_effect = [False, True]
mock_event.return_value.is_set.side_effect = [False, True]
self.assertIsNotNone(Raft({'ttl': 30, 'scope': 'test', 'name': 'pg', 'patronictl': True,
'self_addr': '1', 'data_dir': self._TMP}))

View File

@@ -245,7 +245,7 @@ class TestZooKeeper(unittest.TestCase):
def test_watch(self):
self.zk.watch(None, 0)
self.zk.event.isSet = Mock(return_value=True)
self.zk.event.is_set = Mock(return_value=True)
self.zk._fetch_status = False
self.zk.watch(None, 0)