mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Factor out tags handling into a dedicated class (#2823)
The same (almost) logic was used in three different places: 1. `Patroni` class 2. `Member` class 3. `_MemberStatus` class Now they all inherit newly intoduced `Tags` class.
This commit is contained in:
committed by
GitHub
parent
0ab4bc9d27
commit
3333e78500
@@ -13,6 +13,7 @@ from argparse import Namespace
|
||||
from typing import Any, Dict, Optional, TYPE_CHECKING
|
||||
|
||||
from patroni.daemon import AbstractPatroniDaemon, abstract_main, get_base_arg_parser
|
||||
from patroni.tags import Tags
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from .config import Config
|
||||
@@ -20,7 +21,7 @@ if TYPE_CHECKING: # pragma: no cover
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Patroni(AbstractPatroniDaemon):
|
||||
class Patroni(AbstractPatroniDaemon, Tags):
|
||||
"""Implement ``patroni`` command daemon.
|
||||
|
||||
:ivar version: Patroni version.
|
||||
@@ -30,7 +31,6 @@ class Patroni(AbstractPatroniDaemon):
|
||||
:ivar api: REST API server instance of this node.
|
||||
:ivar request: wrapper for performing HTTP requests.
|
||||
:ivar ha: HA handler.
|
||||
:ivar tags: cache of custom tags configured for this node.
|
||||
:ivar next_run: time when to run the next HA loop cycle.
|
||||
:ivar scheduled_restart: when a restart has been scheduled to occur, if any. In that case, should contain two keys:
|
||||
* ``schedule``: timestamp when restart should occur;
|
||||
@@ -71,7 +71,7 @@ class Patroni(AbstractPatroniDaemon):
|
||||
self.api = RestApiServer(self, self.config['restapi'])
|
||||
self.ha = Ha(self)
|
||||
|
||||
self.tags = self.get_tags()
|
||||
self._tags = self._get_tags()
|
||||
self.next_run = time.time()
|
||||
self.scheduled_restart: Dict[str, Any] = {}
|
||||
|
||||
@@ -121,33 +121,12 @@ class Patroni(AbstractPatroniDaemon):
|
||||
except Exception:
|
||||
return
|
||||
|
||||
def get_tags(self) -> Dict[str, Any]:
|
||||
def _get_tags(self) -> Dict[str, Any]:
|
||||
"""Get tags configured for this node, if any.
|
||||
|
||||
Handle both predefined Patroni tags and custom defined tags.
|
||||
|
||||
.. note::
|
||||
A custom tag is any tag added to the configuration ``tags`` section that is not one of ``clonefrom``,
|
||||
``nofailover``, ``noloadbalance`` or ``nosync``.
|
||||
|
||||
For the Patroni predefined tags, the returning object will only contain them if they are enabled as they
|
||||
all are boolean values that default to disabled.
|
||||
|
||||
:returns: a dictionary of tags set for this node. The key is the tag name, and the value is the corresponding
|
||||
tag value.
|
||||
:returns: a dictionary of tags set for this node.
|
||||
"""
|
||||
return {tag: value for tag, value in self.config.get('tags', {}).items()
|
||||
if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value}
|
||||
|
||||
@property
|
||||
def nofailover(self) -> bool:
|
||||
"""``True`` if ``tags.nofailover`` configuration is enabled for this node, else ``False``."""
|
||||
return bool(self.tags.get('nofailover', False))
|
||||
|
||||
@property
|
||||
def nosync(self) -> bool:
|
||||
"""``True`` if ``tags.nosync`` configuration is enabled for this node, else ``False``."""
|
||||
return bool(self.tags.get('nosync', False))
|
||||
return self._filter_tags(self.config.get('tags', {}))
|
||||
|
||||
def reload_config(self, sighup: bool = False, local: Optional[bool] = False) -> None:
|
||||
"""Apply new configuration values for ``patroni`` daemon.
|
||||
@@ -166,7 +145,7 @@ class Patroni(AbstractPatroniDaemon):
|
||||
try:
|
||||
super(Patroni, self).reload_config(sighup, local)
|
||||
if local:
|
||||
self.tags = self.get_tags()
|
||||
self._tags = self._get_tags()
|
||||
self.request.reload_config(self.config)
|
||||
if local or sighup and self.api.reload_local_certificate():
|
||||
self.api.reload_config(self.config['restapi'])
|
||||
@@ -177,14 +156,9 @@ class Patroni(AbstractPatroniDaemon):
|
||||
logger.exception('Failed to reload config_file=%s', self.config.config_file)
|
||||
|
||||
@property
|
||||
def replicatefrom(self) -> Optional[str]:
|
||||
"""Value of ``tags.replicatefrom`` configuration, if any."""
|
||||
return self.tags.get('replicatefrom')
|
||||
|
||||
@property
|
||||
def noloadbalance(self) -> bool:
|
||||
"""``True`` if ``tags.noloadbalance`` configuration is enabled for this node, else ``False``."""
|
||||
return bool(self.tags.get('noloadbalance', False))
|
||||
def tags(self) -> Dict[str, Any]:
|
||||
"""Tags configured for this node, if any."""
|
||||
return self._tags
|
||||
|
||||
def schedule_next_run(self) -> None:
|
||||
"""Schedule the next run of the ``patroni`` daemon main loop.
|
||||
|
||||
@@ -23,6 +23,7 @@ import dateutil.parser
|
||||
|
||||
from ..exceptions import PatroniFatalException
|
||||
from ..utils import deep_compare, uri
|
||||
from ..tags import Tags
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from ..config import Config
|
||||
@@ -191,11 +192,11 @@ _Version = Union[int, str]
|
||||
_Session = Union[int, float, str, None]
|
||||
|
||||
|
||||
class Member(NamedTuple('Member',
|
||||
[('version', _Version),
|
||||
('name', str),
|
||||
('session', _Session),
|
||||
('data', Dict[str, Any])])):
|
||||
class Member(Tags, NamedTuple('Member',
|
||||
[('version', _Version),
|
||||
('name', str),
|
||||
('session', _Session),
|
||||
('data', Dict[str, Any])])):
|
||||
"""Immutable object (namedtuple) which represents single member of PostgreSQL cluster.
|
||||
|
||||
.. note::
|
||||
@@ -316,20 +317,10 @@ class Member(NamedTuple('Member',
|
||||
"""The ``tags`` value from :attr:`~Member.data` if defined, otherwise an empty dictionary."""
|
||||
return self.data.get('tags', {})
|
||||
|
||||
@property
|
||||
def nofailover(self) -> bool:
|
||||
"""The value for ``nofailover`` in :attr:`Member`.tags`` if defined, otherwise ``False``."""
|
||||
return self.tags.get('nofailover', False)
|
||||
|
||||
@property
|
||||
def replicatefrom(self) -> Optional[str]:
|
||||
"""The value for ``replicatefrom`` in :attr:`Member`.tags`` if defined."""
|
||||
return self.tags.get('replicatefrom')
|
||||
|
||||
@property
|
||||
def clonefrom(self) -> bool:
|
||||
"""``True`` if both ``clonefrom`` tag is ``True`` and a connection URL is defined."""
|
||||
return self.tags.get('clonefrom', False) and bool(self.conn_url)
|
||||
return super().clonefrom and bool(self.conn_url)
|
||||
|
||||
@property
|
||||
def state(self) -> str:
|
||||
|
||||
@@ -20,31 +20,28 @@ from .postgresql.callback_executor import CallbackAction
|
||||
from .postgresql.misc import postgres_version_to_int
|
||||
from .postgresql.postmaster import PostmasterProcess
|
||||
from .postgresql.rewind import Rewind
|
||||
from .tags import Tags
|
||||
from .utils import polling_loop, tzutc
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _MemberStatus(NamedTuple):
|
||||
"""Node status distilled from API response:
|
||||
class _MemberStatus(Tags, NamedTuple('_MemberStatus',
|
||||
[('member', Member),
|
||||
('reachable', bool),
|
||||
('in_recovery', Optional[bool]),
|
||||
('wal_position', int),
|
||||
('data', Dict[str, Any])])):
|
||||
"""Node status distilled from API response.
|
||||
|
||||
member - dcs.Member object of the node
|
||||
reachable - `!False` if the node is not reachable or is not responding with correct JSON
|
||||
in_recovery - `!True` if pg_is_in_recovery() == true
|
||||
dcs_last_seen - timestamp from JSON of last succesful communication with DCS
|
||||
timeline - timeline value from JSON
|
||||
wal_position - maximum value of `replayed_location` or `received_location` from JSON
|
||||
tags - dictionary with values of different tags (i.e. nofailover)
|
||||
watchdog_failed - indicates that watchdog is required by configuration but not available or failed
|
||||
Consists of the following fields:
|
||||
|
||||
:ivar member: :class:`~patroni.dcs.Member` object of the node.
|
||||
:ivar reachable: ``False`` if the node is not reachable or is not responding with correct JSON.
|
||||
:ivar in_recovery: ``False`` if the node is running as a primary (`if pg_is_in_recovery() == true`).
|
||||
:ivar wal_position: maximum value of ``replayed_location`` or ``received_location`` from JSON.
|
||||
:ivar data: the whole JSON response for future usage.
|
||||
"""
|
||||
member: Member
|
||||
reachable: bool
|
||||
in_recovery: Optional[bool]
|
||||
dcs_last_seen: int
|
||||
timeline: int
|
||||
wal_position: int
|
||||
tags: Dict[str, Any]
|
||||
watchdog_failed: bool
|
||||
|
||||
@classmethod
|
||||
def from_api_response(cls, member: Member, json: Dict[str, Any]) -> '_MemberStatus':
|
||||
@@ -57,21 +54,34 @@ class _MemberStatus(NamedTuple):
|
||||
wal: Dict[str, Any] = json.get('wal') or json['xlog']
|
||||
# abuse difference in primary/replica response format
|
||||
in_recovery = not (bool(wal.get('location')) or json.get('role') in ('master', 'primary'))
|
||||
timeline = json.get('timeline', 0)
|
||||
dcs_last_seen = json.get('dcs_last_seen', 0)
|
||||
lsn = int(in_recovery and max(wal.get('received_location', 0), wal.get('replayed_location', 0)))
|
||||
return cls(member, True, in_recovery, dcs_last_seen, timeline, lsn,
|
||||
json.get('tags', {}), json.get('watchdog_failed', False))
|
||||
return cls(member, True, in_recovery, lsn, json)
|
||||
|
||||
@property
|
||||
def tags(self) -> Dict[str, Any]:
|
||||
"""Dictionary with values of different tags (i.e. nofailover)."""
|
||||
return self.data.get('tags', {})
|
||||
|
||||
@property
|
||||
def timeline(self) -> int:
|
||||
"""Timeline value from JSON."""
|
||||
return self.data.get('timeline', 0)
|
||||
|
||||
@property
|
||||
def watchdog_failed(self) -> bool:
|
||||
"""Indicates that watchdog is required by configuration but not available or failed."""
|
||||
return self.data.get('watchdog_failed', False)
|
||||
|
||||
@classmethod
|
||||
def unknown(cls, member: Member) -> '_MemberStatus':
|
||||
return cls(member, False, None, 0, 0, 0, {}, False)
|
||||
"""Create a new class instance with empty or null values."""
|
||||
return cls(member, False, None, 0, {})
|
||||
|
||||
def failover_limitation(self) -> Optional[str]:
|
||||
"""Returns reason why this node can't promote or None if everything is ok."""
|
||||
if not self.reachable:
|
||||
return 'not reachable'
|
||||
if self.tags.get('nofailover', False):
|
||||
if self.nofailover:
|
||||
return 'not allowed to promote'
|
||||
if self.watchdog_failed:
|
||||
return 'not watchdog capable'
|
||||
|
||||
@@ -209,7 +209,7 @@ class _ReplicaList(List[_Replica]):
|
||||
# 2. can be mapped to a ``Member`` of the ``Cluster``:
|
||||
# a. ``Member`` doesn't have ``nosync`` tag set;
|
||||
# b. PostgreSQL on the member is known to be running and accepting client connections.
|
||||
if member and row[sort_col] is not None and member.is_running and not member.tags.get('nosync', False):
|
||||
if member and row[sort_col] is not None and member.is_running and not member.nosync:
|
||||
self.append(_Replica(row['pid'], row['application_name'],
|
||||
row['sync_state'], row[sort_col], bool(member.nofailover)))
|
||||
|
||||
|
||||
64
patroni/tags.py
Normal file
64
patroni/tags.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Tags handling."""
|
||||
import abc
|
||||
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
|
||||
class Tags(abc.ABC):
|
||||
"""An abstract class that encapsulates all the ``tags`` logic.
|
||||
|
||||
Child classes that want to use provided facilities must implement ``tags`` abstract property.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _filter_tags(tags: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Get tags configured for this node, if any.
|
||||
|
||||
Handle both predefined Patroni tags and custom defined tags.
|
||||
|
||||
.. note::
|
||||
A custom tag is any tag added to the configuration ``tags`` section that is not one of ``clonefrom``,
|
||||
``nofailover``, ``noloadbalance`` or ``nosync``.
|
||||
|
||||
For the Patroni predefined tags, the returning object will only contain them if they are enabled as they
|
||||
all are boolean values that default to disabled.
|
||||
|
||||
:returns: a dictionary of tags set for this node. The key is the tag name, and the value is the corresponding
|
||||
tag value.
|
||||
"""
|
||||
return {tag: value for tag, value in tags.items()
|
||||
if tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync') or value}
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def tags(self) -> Dict[str, Any]:
|
||||
"""Configured tags.
|
||||
|
||||
Must be implemented in a child class.
|
||||
"""
|
||||
raise NotImplementedError # pragma: no cover
|
||||
|
||||
@property
|
||||
def clonefrom(self) -> bool:
|
||||
"""``True`` if ``clonefrom`` tag is ``True``, else ``False``."""
|
||||
return self.tags.get('clonefrom', False)
|
||||
|
||||
@property
|
||||
def nofailover(self) -> bool:
|
||||
"""``True`` if ``nofailover`` is ``True``, else ``False``."""
|
||||
return bool(self.tags.get('nofailover', False))
|
||||
|
||||
@property
|
||||
def noloadbalance(self) -> bool:
|
||||
"""``True`` if ``noloadbalance`` is ``True``, else ``False``."""
|
||||
return bool(self.tags.get('noloadbalance', False))
|
||||
|
||||
@property
|
||||
def nosync(self) -> bool:
|
||||
"""``True`` if ``nosync`` is ``True``, else ``False``."""
|
||||
return bool(self.tags.get('nosync', False))
|
||||
|
||||
@property
|
||||
def replicatefrom(self) -> Optional[str]:
|
||||
"""Value of ``replicatefrom`` tag, if any."""
|
||||
return self.tags.get('replicatefrom')
|
||||
@@ -100,7 +100,7 @@ class MockHa(object):
|
||||
|
||||
@staticmethod
|
||||
def fetch_nodes_statuses(members):
|
||||
return [_MemberStatus(None, True, None, 0, 0, None, {}, False)]
|
||||
return [_MemberStatus(None, True, None, 0, {})]
|
||||
|
||||
@staticmethod
|
||||
def schedule_future_restart(data):
|
||||
|
||||
@@ -99,7 +99,9 @@ def get_node_status(reachable=True, in_recovery=True, dcs_last_seen=0,
|
||||
tags = {}
|
||||
if nofailover:
|
||||
tags['nofailover'] = True
|
||||
return _MemberStatus(e, reachable, in_recovery, dcs_last_seen, timeline, wal_position, tags, watchdog_failed)
|
||||
return _MemberStatus(e, reachable, in_recovery, wal_position,
|
||||
{'tags': tags, 'watchdog_failed': watchdog_failed,
|
||||
'dcs_last_seen': dcs_last_seen, 'timeline': timeline})
|
||||
return fetch_node_status
|
||||
|
||||
|
||||
@@ -1294,14 +1296,16 @@ class TestHa(PostgresInit):
|
||||
mock_restart.assert_called_once()
|
||||
self.ha.dcs.get_cluster.assert_not_called()
|
||||
|
||||
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
||||
def test_enable_synchronous_mode(self):
|
||||
self.ha.is_synchronous_mode = true
|
||||
self.ha.has_lock = true
|
||||
self.p.name = 'leader'
|
||||
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
|
||||
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
||||
with patch('patroni.ha.logger.info') as mock_logger:
|
||||
self.ha.run_cycle()
|
||||
self.assertEqual(mock_logger.call_args[0][0], 'Enabled synchronous replication')
|
||||
self.assertEqual(mock_logger.call_args_list[0][0][0], 'Enabled synchronous replication')
|
||||
self.ha.dcs.write_sync_state = Mock(return_value=None)
|
||||
with patch('patroni.ha.logger.warning') as mock_logger:
|
||||
self.ha.run_cycle()
|
||||
|
||||
@@ -185,7 +185,7 @@ class TestPatroni(unittest.TestCase):
|
||||
|
||||
def test_reload_config(self):
|
||||
self.p.reload_config()
|
||||
self.p.get_tags = Mock(side_effect=Exception)
|
||||
self.p._get_tags = Mock(side_effect=Exception)
|
||||
self.p.reload_config(local=True)
|
||||
|
||||
def test_nosync(self):
|
||||
|
||||
Reference in New Issue
Block a user