From 3333e78500204159332e032e5d07727dfbb7129d Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Mon, 21 Aug 2023 17:03:14 +0200 Subject: [PATCH] Factor out tags handling into a dedicated class (#2823) The same (almost) logic was used in three different places: 1. `Patroni` class 2. `Member` class 3. `_MemberStatus` class Now they all inherit newly intoduced `Tags` class. --- patroni/__main__.py | 46 ++++++--------------------- patroni/dcs/__init__.py | 23 +++++--------- patroni/ha.py | 58 ++++++++++++++++++++-------------- patroni/postgresql/sync.py | 2 +- patroni/tags.py | 64 ++++++++++++++++++++++++++++++++++++++ tests/test_api.py | 2 +- tests/test_ha.py | 8 +++-- tests/test_patroni.py | 2 +- 8 files changed, 124 insertions(+), 81 deletions(-) create mode 100644 patroni/tags.py diff --git a/patroni/__main__.py b/patroni/__main__.py index 49542f0c..2b318a67 100644 --- a/patroni/__main__.py +++ b/patroni/__main__.py @@ -13,6 +13,7 @@ from argparse import Namespace from typing import Any, Dict, Optional, TYPE_CHECKING from patroni.daemon import AbstractPatroniDaemon, abstract_main, get_base_arg_parser +from patroni.tags import Tags if TYPE_CHECKING: # pragma: no cover from .config import Config @@ -20,7 +21,7 @@ if TYPE_CHECKING: # pragma: no cover logger = logging.getLogger(__name__) -class Patroni(AbstractPatroniDaemon): +class Patroni(AbstractPatroniDaemon, Tags): """Implement ``patroni`` command daemon. :ivar version: Patroni version. @@ -30,7 +31,6 @@ class Patroni(AbstractPatroniDaemon): :ivar api: REST API server instance of this node. :ivar request: wrapper for performing HTTP requests. :ivar ha: HA handler. - :ivar tags: cache of custom tags configured for this node. :ivar next_run: time when to run the next HA loop cycle. :ivar scheduled_restart: when a restart has been scheduled to occur, if any. In that case, should contain two keys: * ``schedule``: timestamp when restart should occur; @@ -71,7 +71,7 @@ class Patroni(AbstractPatroniDaemon): self.api = RestApiServer(self, self.config['restapi']) self.ha = Ha(self) - self.tags = self.get_tags() + self._tags = self._get_tags() self.next_run = time.time() self.scheduled_restart: Dict[str, Any] = {} @@ -121,33 +121,12 @@ class Patroni(AbstractPatroniDaemon): except Exception: return - def get_tags(self) -> Dict[str, Any]: + def _get_tags(self) -> Dict[str, Any]: """Get tags configured for this node, if any. - Handle both predefined Patroni tags and custom defined tags. - - .. note:: - A custom tag is any tag added to the configuration ``tags`` section that is not one of ``clonefrom``, - ``nofailover``, ``noloadbalance`` or ``nosync``. - - For the Patroni predefined tags, the returning object will only contain them if they are enabled as they - all are boolean values that default to disabled. - - :returns: a dictionary of tags set for this node. The key is the tag name, and the value is the corresponding - tag value. + :returns: a dictionary of tags set for this node. """ - return {tag: value for tag, value in self.config.get('tags', {}).items() - if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value} - - @property - def nofailover(self) -> bool: - """``True`` if ``tags.nofailover`` configuration is enabled for this node, else ``False``.""" - return bool(self.tags.get('nofailover', False)) - - @property - def nosync(self) -> bool: - """``True`` if ``tags.nosync`` configuration is enabled for this node, else ``False``.""" - return bool(self.tags.get('nosync', False)) + return self._filter_tags(self.config.get('tags', {})) def reload_config(self, sighup: bool = False, local: Optional[bool] = False) -> None: """Apply new configuration values for ``patroni`` daemon. @@ -166,7 +145,7 @@ class Patroni(AbstractPatroniDaemon): try: super(Patroni, self).reload_config(sighup, local) if local: - self.tags = self.get_tags() + self._tags = self._get_tags() self.request.reload_config(self.config) if local or sighup and self.api.reload_local_certificate(): self.api.reload_config(self.config['restapi']) @@ -177,14 +156,9 @@ class Patroni(AbstractPatroniDaemon): logger.exception('Failed to reload config_file=%s', self.config.config_file) @property - def replicatefrom(self) -> Optional[str]: - """Value of ``tags.replicatefrom`` configuration, if any.""" - return self.tags.get('replicatefrom') - - @property - def noloadbalance(self) -> bool: - """``True`` if ``tags.noloadbalance`` configuration is enabled for this node, else ``False``.""" - return bool(self.tags.get('noloadbalance', False)) + def tags(self) -> Dict[str, Any]: + """Tags configured for this node, if any.""" + return self._tags def schedule_next_run(self) -> None: """Schedule the next run of the ``patroni`` daemon main loop. diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index b76e1bda..d28a59ce 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -23,6 +23,7 @@ import dateutil.parser from ..exceptions import PatroniFatalException from ..utils import deep_compare, uri +from ..tags import Tags if TYPE_CHECKING: # pragma: no cover from ..config import Config @@ -191,11 +192,11 @@ _Version = Union[int, str] _Session = Union[int, float, str, None] -class Member(NamedTuple('Member', - [('version', _Version), - ('name', str), - ('session', _Session), - ('data', Dict[str, Any])])): +class Member(Tags, NamedTuple('Member', + [('version', _Version), + ('name', str), + ('session', _Session), + ('data', Dict[str, Any])])): """Immutable object (namedtuple) which represents single member of PostgreSQL cluster. .. note:: @@ -316,20 +317,10 @@ class Member(NamedTuple('Member', """The ``tags`` value from :attr:`~Member.data` if defined, otherwise an empty dictionary.""" return self.data.get('tags', {}) - @property - def nofailover(self) -> bool: - """The value for ``nofailover`` in :attr:`Member`.tags`` if defined, otherwise ``False``.""" - return self.tags.get('nofailover', False) - - @property - def replicatefrom(self) -> Optional[str]: - """The value for ``replicatefrom`` in :attr:`Member`.tags`` if defined.""" - return self.tags.get('replicatefrom') - @property def clonefrom(self) -> bool: """``True`` if both ``clonefrom`` tag is ``True`` and a connection URL is defined.""" - return self.tags.get('clonefrom', False) and bool(self.conn_url) + return super().clonefrom and bool(self.conn_url) @property def state(self) -> str: diff --git a/patroni/ha.py b/patroni/ha.py index 28bd92a3..befa1ff9 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -20,31 +20,28 @@ from .postgresql.callback_executor import CallbackAction from .postgresql.misc import postgres_version_to_int from .postgresql.postmaster import PostmasterProcess from .postgresql.rewind import Rewind +from .tags import Tags from .utils import polling_loop, tzutc logger = logging.getLogger(__name__) -class _MemberStatus(NamedTuple): - """Node status distilled from API response: +class _MemberStatus(Tags, NamedTuple('_MemberStatus', + [('member', Member), + ('reachable', bool), + ('in_recovery', Optional[bool]), + ('wal_position', int), + ('data', Dict[str, Any])])): + """Node status distilled from API response. - member - dcs.Member object of the node - reachable - `!False` if the node is not reachable or is not responding with correct JSON - in_recovery - `!True` if pg_is_in_recovery() == true - dcs_last_seen - timestamp from JSON of last succesful communication with DCS - timeline - timeline value from JSON - wal_position - maximum value of `replayed_location` or `received_location` from JSON - tags - dictionary with values of different tags (i.e. nofailover) - watchdog_failed - indicates that watchdog is required by configuration but not available or failed + Consists of the following fields: + + :ivar member: :class:`~patroni.dcs.Member` object of the node. + :ivar reachable: ``False`` if the node is not reachable or is not responding with correct JSON. + :ivar in_recovery: ``False`` if the node is running as a primary (`if pg_is_in_recovery() == true`). + :ivar wal_position: maximum value of ``replayed_location`` or ``received_location`` from JSON. + :ivar data: the whole JSON response for future usage. """ - member: Member - reachable: bool - in_recovery: Optional[bool] - dcs_last_seen: int - timeline: int - wal_position: int - tags: Dict[str, Any] - watchdog_failed: bool @classmethod def from_api_response(cls, member: Member, json: Dict[str, Any]) -> '_MemberStatus': @@ -57,21 +54,34 @@ class _MemberStatus(NamedTuple): wal: Dict[str, Any] = json.get('wal') or json['xlog'] # abuse difference in primary/replica response format in_recovery = not (bool(wal.get('location')) or json.get('role') in ('master', 'primary')) - timeline = json.get('timeline', 0) - dcs_last_seen = json.get('dcs_last_seen', 0) lsn = int(in_recovery and max(wal.get('received_location', 0), wal.get('replayed_location', 0))) - return cls(member, True, in_recovery, dcs_last_seen, timeline, lsn, - json.get('tags', {}), json.get('watchdog_failed', False)) + return cls(member, True, in_recovery, lsn, json) + + @property + def tags(self) -> Dict[str, Any]: + """Dictionary with values of different tags (i.e. nofailover).""" + return self.data.get('tags', {}) + + @property + def timeline(self) -> int: + """Timeline value from JSON.""" + return self.data.get('timeline', 0) + + @property + def watchdog_failed(self) -> bool: + """Indicates that watchdog is required by configuration but not available or failed.""" + return self.data.get('watchdog_failed', False) @classmethod def unknown(cls, member: Member) -> '_MemberStatus': - return cls(member, False, None, 0, 0, 0, {}, False) + """Create a new class instance with empty or null values.""" + return cls(member, False, None, 0, {}) def failover_limitation(self) -> Optional[str]: """Returns reason why this node can't promote or None if everything is ok.""" if not self.reachable: return 'not reachable' - if self.tags.get('nofailover', False): + if self.nofailover: return 'not allowed to promote' if self.watchdog_failed: return 'not watchdog capable' diff --git a/patroni/postgresql/sync.py b/patroni/postgresql/sync.py index c8a6ccdc..9cff04e0 100644 --- a/patroni/postgresql/sync.py +++ b/patroni/postgresql/sync.py @@ -209,7 +209,7 @@ class _ReplicaList(List[_Replica]): # 2. can be mapped to a ``Member`` of the ``Cluster``: # a. ``Member`` doesn't have ``nosync`` tag set; # b. PostgreSQL on the member is known to be running and accepting client connections. - if member and row[sort_col] is not None and member.is_running and not member.tags.get('nosync', False): + if member and row[sort_col] is not None and member.is_running and not member.nosync: self.append(_Replica(row['pid'], row['application_name'], row['sync_state'], row[sort_col], bool(member.nofailover))) diff --git a/patroni/tags.py b/patroni/tags.py new file mode 100644 index 00000000..6b3a1984 --- /dev/null +++ b/patroni/tags.py @@ -0,0 +1,64 @@ +"""Tags handling.""" +import abc + +from typing import Any, Dict, Optional + + +class Tags(abc.ABC): + """An abstract class that encapsulates all the ``tags`` logic. + + Child classes that want to use provided facilities must implement ``tags`` abstract property. + """ + + @staticmethod + def _filter_tags(tags: Dict[str, Any]) -> Dict[str, Any]: + """Get tags configured for this node, if any. + + Handle both predefined Patroni tags and custom defined tags. + + .. note:: + A custom tag is any tag added to the configuration ``tags`` section that is not one of ``clonefrom``, + ``nofailover``, ``noloadbalance`` or ``nosync``. + + For the Patroni predefined tags, the returning object will only contain them if they are enabled as they + all are boolean values that default to disabled. + + :returns: a dictionary of tags set for this node. The key is the tag name, and the value is the corresponding + tag value. + """ + return {tag: value for tag, value in tags.items() + if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value} + + @property + @abc.abstractmethod + def tags(self) -> Dict[str, Any]: + """Configured tags. + + Must be implemented in a child class. + """ + raise NotImplementedError # pragma: no cover + + @property + def clonefrom(self) -> bool: + """``True`` if ``clonefrom`` tag is ``True``, else ``False``.""" + return self.tags.get('clonefrom', False) + + @property + def nofailover(self) -> bool: + """``True`` if ``nofailover`` is ``True``, else ``False``.""" + return bool(self.tags.get('nofailover', False)) + + @property + def noloadbalance(self) -> bool: + """``True`` if ``noloadbalance`` is ``True``, else ``False``.""" + return bool(self.tags.get('noloadbalance', False)) + + @property + def nosync(self) -> bool: + """``True`` if ``nosync`` is ``True``, else ``False``.""" + return bool(self.tags.get('nosync', False)) + + @property + def replicatefrom(self) -> Optional[str]: + """Value of ``replicatefrom`` tag, if any.""" + return self.tags.get('replicatefrom') diff --git a/tests/test_api.py b/tests/test_api.py index b1e56fdf..f433ca18 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -100,7 +100,7 @@ class MockHa(object): @staticmethod def fetch_nodes_statuses(members): - return [_MemberStatus(None, True, None, 0, 0, None, {}, False)] + return [_MemberStatus(None, True, None, 0, {})] @staticmethod def schedule_future_restart(data): diff --git a/tests/test_ha.py b/tests/test_ha.py index 6acb2e15..b35ebc52 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -99,7 +99,9 @@ def get_node_status(reachable=True, in_recovery=True, dcs_last_seen=0, tags = {} if nofailover: tags['nofailover'] = True - return _MemberStatus(e, reachable, in_recovery, dcs_last_seen, timeline, wal_position, tags, watchdog_failed) + return _MemberStatus(e, reachable, in_recovery, wal_position, + {'tags': tags, 'watchdog_failed': watchdog_failed, + 'dcs_last_seen': dcs_last_seen, 'timeline': timeline}) return fetch_node_status @@ -1294,14 +1296,16 @@ class TestHa(PostgresInit): mock_restart.assert_called_once() self.ha.dcs.get_cluster.assert_not_called() + @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_enable_synchronous_mode(self): self.ha.is_synchronous_mode = true self.ha.has_lock = true self.p.name = 'leader' + self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet())) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) with patch('patroni.ha.logger.info') as mock_logger: self.ha.run_cycle() - self.assertEqual(mock_logger.call_args[0][0], 'Enabled synchronous replication') + self.assertEqual(mock_logger.call_args_list[0][0][0], 'Enabled synchronous replication') self.ha.dcs.write_sync_state = Mock(return_value=None) with patch('patroni.ha.logger.warning') as mock_logger: self.ha.run_cycle() diff --git a/tests/test_patroni.py b/tests/test_patroni.py index 605b3227..0385731c 100644 --- a/tests/test_patroni.py +++ b/tests/test_patroni.py @@ -185,7 +185,7 @@ class TestPatroni(unittest.TestCase): def test_reload_config(self): self.p.reload_config() - self.p.get_tags = Mock(side_effect=Exception) + self.p._get_tags = Mock(side_effect=Exception) self.p.reload_config(local=True) def test_nosync(self):