mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
@@ -1,3 +1,8 @@
|
||||
"""Patroni main entry point.
|
||||
|
||||
Implement ``patroni`` main daemon and expose its entry point.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
@@ -16,8 +21,33 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Patroni(AbstractPatroniDaemon):
|
||||
"""Implement ``patroni`` command daemon.
|
||||
|
||||
:ivar version: Patroni version.
|
||||
:ivar dcs: DCS object.
|
||||
:ivar watchdog: watchdog handler, if configured to use watchdog.
|
||||
:ivar postgresql: managed Postgres instance.
|
||||
:ivar api: REST API server instance of this node.
|
||||
:ivar request: wrapper for performing HTTP requests.
|
||||
:ivar ha: HA handler.
|
||||
:ivar tags: cache of custom tags configured for this node.
|
||||
:ivar next_run: time when to run the next HA loop cycle.
|
||||
:ivar scheduled_restart: when a restart has been scheduled to occur, if any. In that case, should contain two keys:
|
||||
* ``schedule``: timestamp when restart should occur;
|
||||
* ``postmaster_start_time``: timestamp when Postgres was last started.
|
||||
"""
|
||||
|
||||
def __init__(self, config: 'Config') -> None:
|
||||
"""Create a :class:`Patroni` instance with the given *config*.
|
||||
|
||||
Get a connection to the DCS, configure watchdog (if required), set up Patroni interface with Postgres, configure
|
||||
the HA loop and bring the REST API up.
|
||||
|
||||
.. note::
|
||||
Expected to be instantiated and run through :func:`~patroni.daemon.abstract_main`.
|
||||
|
||||
:param config: Patroni configuration.
|
||||
"""
|
||||
from patroni.api import RestApiServer
|
||||
from patroni.dcs import get_dcs
|
||||
from patroni.ha import Ha
|
||||
@@ -46,6 +76,17 @@ class Patroni(AbstractPatroniDaemon):
|
||||
self.scheduled_restart: Dict[str, Any] = {}
|
||||
|
||||
def load_dynamic_configuration(self) -> None:
|
||||
"""Load Patroni dynamic configuration.
|
||||
|
||||
Load dynamic configuration from the DCS, if `/config` key is available in the DCS, otherwise fall back to
|
||||
``bootstrap.dcs`` section from the configuration file.
|
||||
|
||||
If the DCS connection fails returning the exception :class:`~patroni.exceptions.DCSError` an attempt will be
|
||||
remade every 5 seconds.
|
||||
|
||||
.. note::
|
||||
This method is called only once, at the time when Patroni is started.
|
||||
"""
|
||||
from patroni.exceptions import DCSError
|
||||
while True:
|
||||
try:
|
||||
@@ -81,18 +122,47 @@ class Patroni(AbstractPatroniDaemon):
|
||||
return
|
||||
|
||||
def get_tags(self) -> Dict[str, Any]:
|
||||
"""Get tags configured for this node, if any.
|
||||
|
||||
Handle both predefined Patroni tags and custom defined tags.
|
||||
|
||||
.. note::
|
||||
A custom tag is any tag added to the configuration ``tags`` section that is not one of ``clonefrom``,
|
||||
``nofailover``, ``noloadbalance`` or ``nosync``.
|
||||
|
||||
For the Patroni predefined tags, the returning object will only contain them if they are enabled as they
|
||||
all are boolean values that default to disabled.
|
||||
|
||||
:returns: a dictionary of tags set for this node. The key is the tag name, and the value is the corresponding
|
||||
tag value.
|
||||
"""
|
||||
return {tag: value for tag, value in self.config.get('tags', {}).items()
|
||||
if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value}
|
||||
|
||||
@property
|
||||
def nofailover(self) -> bool:
|
||||
"""``True`` if ``tags.nofailover`` configuration is enabled for this node, else ``False``."""
|
||||
return bool(self.tags.get('nofailover', False))
|
||||
|
||||
@property
|
||||
def nosync(self) -> bool:
|
||||
"""``True`` if ``tags.nosync`` configuration is enabled for this node, else ``False``."""
|
||||
return bool(self.tags.get('nosync', False))
|
||||
|
||||
def reload_config(self, sighup: bool = False, local: Optional[bool] = False) -> None:
|
||||
"""Apply new configuration values for ``patroni`` daemon.
|
||||
|
||||
Reload:
|
||||
* Cached tags;
|
||||
* Request wrapper configuration;
|
||||
* REST API configuration;
|
||||
* Watchdog configuration;
|
||||
* Postgres configuration;
|
||||
* DCS configuration.
|
||||
|
||||
:param sighup: if it is related to a SIGHUP signal.
|
||||
:param local: if there has been changes to the local configuration file.
|
||||
"""
|
||||
try:
|
||||
super(Patroni, self).reload_config(sighup, local)
|
||||
if local:
|
||||
@@ -107,14 +177,21 @@ class Patroni(AbstractPatroniDaemon):
|
||||
logger.exception('Failed to reload config_file=%s', self.config.config_file)
|
||||
|
||||
@property
|
||||
def replicatefrom(self):
|
||||
def replicatefrom(self) -> Optional[str]:
|
||||
"""Value of ``tags.replicatefrom`` configuration, if any."""
|
||||
return self.tags.get('replicatefrom')
|
||||
|
||||
@property
|
||||
def noloadbalance(self):
|
||||
def noloadbalance(self) -> bool:
|
||||
"""``True`` if ``tags.noloadbalance`` configuration is enabled for this node, else ``False``."""
|
||||
return bool(self.tags.get('noloadbalance', False))
|
||||
|
||||
def schedule_next_run(self) -> None:
|
||||
"""Schedule the next run of the ``patroni`` daemon main loop.
|
||||
|
||||
Next run is scheduled based on previous run plus value of ``loop_wait`` configuration from DCS. If that has
|
||||
already been exceeded, run the next cycle immediately.
|
||||
"""
|
||||
self.next_run += self.dcs.loop_wait
|
||||
current_time = time.time()
|
||||
nap_time = self.next_run - current_time
|
||||
@@ -128,11 +205,21 @@ class Patroni(AbstractPatroniDaemon):
|
||||
self.next_run = time.time()
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run ``patroni`` daemon process main loop.
|
||||
|
||||
Start the REST API and keep running HA cycles every ``loop_wait`` seconds.
|
||||
"""
|
||||
self.api.start()
|
||||
self.next_run = time.time()
|
||||
super(Patroni, self).run()
|
||||
|
||||
def _run_cycle(self) -> None:
|
||||
"""Run a cycle of the ``patroni`` daemon main loop.
|
||||
|
||||
Run an HA cycle and schedule the next cycle run. If any dynamic configuration change request is detected, apply
|
||||
the change and cache the new dynamic configuration values in ``patroni.dynamic.json`` file under Postgres data
|
||||
directory.
|
||||
"""
|
||||
logger.info(self.ha.run_cycle())
|
||||
|
||||
if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \
|
||||
@@ -145,6 +232,10 @@ class Patroni(AbstractPatroniDaemon):
|
||||
self.schedule_next_run()
|
||||
|
||||
def _shutdown(self) -> None:
|
||||
"""Perform shutdown of ``patroni`` daemon process.
|
||||
|
||||
Shut down the REST API and the HA handler.
|
||||
"""
|
||||
try:
|
||||
self.api.shutdown()
|
||||
except Exception:
|
||||
@@ -156,13 +247,34 @@ class Patroni(AbstractPatroniDaemon):
|
||||
|
||||
|
||||
def patroni_main(configfile: str) -> None:
|
||||
"""Configure and start ``patroni`` main daemon process.
|
||||
|
||||
:param configfile: path to Patroni configuration file.
|
||||
"""
|
||||
from multiprocessing import freeze_support
|
||||
|
||||
# Windows executables created by PyInstaller are frozen, thus we need to enable frozen support for
|
||||
# :mod:`multiprocessing` to avoid :class:`RuntimeError` exceptions.
|
||||
freeze_support()
|
||||
abstract_main(Patroni, configfile)
|
||||
|
||||
|
||||
def process_arguments() -> Namespace:
|
||||
"""Process command-line arguments.
|
||||
|
||||
Create a basic command-line parser through :func:`~patroni.daemon.get_base_arg_parser`, extend its capabilities by
|
||||
adding these flags and parse command-line arguments.:
|
||||
|
||||
* ``--validate-config`` -- used to validate the Patroni configuration file
|
||||
* ``--generate-config`` -- used to generate Patroni configuration from a running PostgreSQL instance
|
||||
* ``--generate-sample-config`` -- used to generate a sample Patroni configuration
|
||||
|
||||
.. note::
|
||||
If running with ``--generate-config``, ``--generate-sample-config`` or ``--validate-flag`` will exit
|
||||
after generating or validating configuration.
|
||||
|
||||
:returns: parsed arguments, if not running with ``--validate-config`` flag.
|
||||
"""
|
||||
from patroni.config_generator import generate_config
|
||||
|
||||
parser = get_base_arg_parser()
|
||||
@@ -196,6 +308,16 @@ def process_arguments() -> Namespace:
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entrypoint of :mod:`patroni.__main__`.
|
||||
|
||||
Process command-line arguments, ensure :mod:`psycopg2` (or :mod:`psycopg`) attendee the pre-requisites and start
|
||||
``patroni`` daemon process.
|
||||
|
||||
.. note::
|
||||
If running through a Docker container, make the main process take care of init process duties and run
|
||||
``patroni`` daemon as another process. In that case relevant signals received by the main process and forwarded
|
||||
to ``patroni`` daemon process.
|
||||
"""
|
||||
from patroni import check_psycopg
|
||||
|
||||
args = process_arguments()
|
||||
@@ -211,7 +333,13 @@ def main() -> None:
|
||||
|
||||
# Looks like we are in a docker, so we will act like init
|
||||
def sigchld_handler(signo: int, stack_frame: Optional[FrameType]) -> None:
|
||||
"""Handle ``SIGCHLD`` received by main process from ``patroni`` daemon when the daemon terminates.
|
||||
|
||||
:param signo: signal number.
|
||||
:param stack_frame: current stack frame.
|
||||
"""
|
||||
try:
|
||||
# log exit code of all children processes, and break loop when there is none left
|
||||
while True:
|
||||
ret = os.waitpid(-1, os.WNOHANG)
|
||||
if ret == (0, 0):
|
||||
@@ -221,7 +349,12 @@ def main() -> None:
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def passtochild(signo: int, stack_frame: Optional[FrameType]):
|
||||
def passtochild(signo: int, stack_frame: Optional[FrameType]) -> None:
|
||||
"""Forward a signal *signo* from main process to child process.
|
||||
|
||||
:param signo: signal number.
|
||||
:param stack_frame: current stack frame.
|
||||
"""
|
||||
if pid:
|
||||
os.kill(pid, signo)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user