diff --git a/patroni/__main__.py b/patroni/__main__.py index e9dcd28a..49542f0c 100644 --- a/patroni/__main__.py +++ b/patroni/__main__.py @@ -1,3 +1,8 @@ +"""Patroni main entry point. + +Implement ``patroni`` main daemon and expose its entry point. +""" + import logging import os import signal @@ -16,8 +21,33 @@ logger = logging.getLogger(__name__) class Patroni(AbstractPatroniDaemon): + """Implement ``patroni`` command daemon. + + :ivar version: Patroni version. + :ivar dcs: DCS object. + :ivar watchdog: watchdog handler, if configured to use watchdog. + :ivar postgresql: managed Postgres instance. + :ivar api: REST API server instance of this node. + :ivar request: wrapper for performing HTTP requests. + :ivar ha: HA handler. + :ivar tags: cache of custom tags configured for this node. + :ivar next_run: time when to run the next HA loop cycle. + :ivar scheduled_restart: when a restart has been scheduled to occur, if any. In that case, should contain two keys: + * ``schedule``: timestamp when restart should occur; + * ``postmaster_start_time``: timestamp when Postgres was last started. + """ def __init__(self, config: 'Config') -> None: + """Create a :class:`Patroni` instance with the given *config*. + + Get a connection to the DCS, configure watchdog (if required), set up Patroni interface with Postgres, configure + the HA loop and bring the REST API up. + + .. note:: + Expected to be instantiated and run through :func:`~patroni.daemon.abstract_main`. + + :param config: Patroni configuration. + """ from patroni.api import RestApiServer from patroni.dcs import get_dcs from patroni.ha import Ha @@ -46,6 +76,17 @@ class Patroni(AbstractPatroniDaemon): self.scheduled_restart: Dict[str, Any] = {} def load_dynamic_configuration(self) -> None: + """Load Patroni dynamic configuration. + + Load dynamic configuration from the DCS, if `/config` key is available in the DCS, otherwise fall back to + ``bootstrap.dcs`` section from the configuration file. + + If the DCS connection fails returning the exception :class:`~patroni.exceptions.DCSError` an attempt will be + remade every 5 seconds. + + .. note:: + This method is called only once, at the time when Patroni is started. + """ from patroni.exceptions import DCSError while True: try: @@ -81,18 +122,47 @@ class Patroni(AbstractPatroniDaemon): return def get_tags(self) -> Dict[str, Any]: + """Get tags configured for this node, if any. + + Handle both predefined Patroni tags and custom defined tags. + + .. note:: + A custom tag is any tag added to the configuration ``tags`` section that is not one of ``clonefrom``, + ``nofailover``, ``noloadbalance`` or ``nosync``. + + For the Patroni predefined tags, the returning object will only contain them if they are enabled as they + all are boolean values that default to disabled. + + :returns: a dictionary of tags set for this node. The key is the tag name, and the value is the corresponding + tag value. + """ return {tag: value for tag, value in self.config.get('tags', {}).items() if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value} @property def nofailover(self) -> bool: + """``True`` if ``tags.nofailover`` configuration is enabled for this node, else ``False``.""" return bool(self.tags.get('nofailover', False)) @property def nosync(self) -> bool: + """``True`` if ``tags.nosync`` configuration is enabled for this node, else ``False``.""" return bool(self.tags.get('nosync', False)) def reload_config(self, sighup: bool = False, local: Optional[bool] = False) -> None: + """Apply new configuration values for ``patroni`` daemon. + + Reload: + * Cached tags; + * Request wrapper configuration; + * REST API configuration; + * Watchdog configuration; + * Postgres configuration; + * DCS configuration. + + :param sighup: if it is related to a SIGHUP signal. + :param local: if there has been changes to the local configuration file. + """ try: super(Patroni, self).reload_config(sighup, local) if local: @@ -107,14 +177,21 @@ class Patroni(AbstractPatroniDaemon): logger.exception('Failed to reload config_file=%s', self.config.config_file) @property - def replicatefrom(self): + def replicatefrom(self) -> Optional[str]: + """Value of ``tags.replicatefrom`` configuration, if any.""" return self.tags.get('replicatefrom') @property - def noloadbalance(self): + def noloadbalance(self) -> bool: + """``True`` if ``tags.noloadbalance`` configuration is enabled for this node, else ``False``.""" return bool(self.tags.get('noloadbalance', False)) def schedule_next_run(self) -> None: + """Schedule the next run of the ``patroni`` daemon main loop. + + Next run is scheduled based on previous run plus value of ``loop_wait`` configuration from DCS. If that has + already been exceeded, run the next cycle immediately. + """ self.next_run += self.dcs.loop_wait current_time = time.time() nap_time = self.next_run - current_time @@ -128,11 +205,21 @@ class Patroni(AbstractPatroniDaemon): self.next_run = time.time() def run(self) -> None: + """Run ``patroni`` daemon process main loop. + + Start the REST API and keep running HA cycles every ``loop_wait`` seconds. + """ self.api.start() self.next_run = time.time() super(Patroni, self).run() def _run_cycle(self) -> None: + """Run a cycle of the ``patroni`` daemon main loop. + + Run an HA cycle and schedule the next cycle run. If any dynamic configuration change request is detected, apply + the change and cache the new dynamic configuration values in ``patroni.dynamic.json`` file under Postgres data + directory. + """ logger.info(self.ha.run_cycle()) if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \ @@ -145,6 +232,10 @@ class Patroni(AbstractPatroniDaemon): self.schedule_next_run() def _shutdown(self) -> None: + """Perform shutdown of ``patroni`` daemon process. + + Shut down the REST API and the HA handler. + """ try: self.api.shutdown() except Exception: @@ -156,13 +247,34 @@ class Patroni(AbstractPatroniDaemon): def patroni_main(configfile: str) -> None: + """Configure and start ``patroni`` main daemon process. + + :param configfile: path to Patroni configuration file. + """ from multiprocessing import freeze_support + # Windows executables created by PyInstaller are frozen, thus we need to enable frozen support for + # :mod:`multiprocessing` to avoid :class:`RuntimeError` exceptions. freeze_support() abstract_main(Patroni, configfile) def process_arguments() -> Namespace: + """Process command-line arguments. + + Create a basic command-line parser through :func:`~patroni.daemon.get_base_arg_parser`, extend its capabilities by + adding these flags and parse command-line arguments.: + + * ``--validate-config`` -- used to validate the Patroni configuration file + * ``--generate-config`` -- used to generate Patroni configuration from a running PostgreSQL instance + * ``--generate-sample-config`` -- used to generate a sample Patroni configuration + + .. note:: + If running with ``--generate-config``, ``--generate-sample-config`` or ``--validate-flag`` will exit + after generating or validating configuration. + + :returns: parsed arguments, if not running with ``--validate-config`` flag. + """ from patroni.config_generator import generate_config parser = get_base_arg_parser() @@ -196,6 +308,16 @@ def process_arguments() -> Namespace: def main() -> None: + """Main entrypoint of :mod:`patroni.__main__`. + + Process command-line arguments, ensure :mod:`psycopg2` (or :mod:`psycopg`) attendee the pre-requisites and start + ``patroni`` daemon process. + + .. note:: + If running through a Docker container, make the main process take care of init process duties and run + ``patroni`` daemon as another process. In that case relevant signals received by the main process and forwarded + to ``patroni`` daemon process. + """ from patroni import check_psycopg args = process_arguments() @@ -211,7 +333,13 @@ def main() -> None: # Looks like we are in a docker, so we will act like init def sigchld_handler(signo: int, stack_frame: Optional[FrameType]) -> None: + """Handle ``SIGCHLD`` received by main process from ``patroni`` daemon when the daemon terminates. + + :param signo: signal number. + :param stack_frame: current stack frame. + """ try: + # log exit code of all children processes, and break loop when there is none left while True: ret = os.waitpid(-1, os.WNOHANG) if ret == (0, 0): @@ -221,7 +349,12 @@ def main() -> None: except OSError: pass - def passtochild(signo: int, stack_frame: Optional[FrameType]): + def passtochild(signo: int, stack_frame: Optional[FrameType]) -> None: + """Forward a signal *signo* from main process to child process. + + :param signo: signal number. + :param stack_frame: current stack frame. + """ if pid: os.kill(pid, signo)