mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
Configurable retention of members replication slots (#3108)
Current problem of Patroni that strikes many people is that it removes replication slot for member which key is expired from DCS. As a result, when the replica comes back from a scheduled maintenance WAL segments could be already absent, and it can't continue streaming without pulling files from archive. With PostgreSQL 16 and newer we get another problem: logical slot on a standby node could be invalidated if physical replication slot on the primary was removed (and `pg_catalog` vacuumed). The most problematic environment is Kubernetes, where slot is removed nearly instantly when member Pod is deleted. So far, one of the recommended solutions was to configure permanent physical slots with names that match member names to avoid removal of replication slots. It works, but depending on environment might be non-trivial to implement (when for example members may change their names). This PR implements support of `member_slots_ttl` global configuration parameter, that controls for how long member replication slots should be kept when the member key is absent. Default value is set to `30min`. The feature is supported only starting from PostgreSQL 11 and newer, because we want to retain slots not only on the leader node, but on all nodes that could potentially become the new leader, and they should be moved forward using `pg_replication_slot_advance()` function. One could disable feature and get back to the old behavior by setting `member_slots_ttl` to `0`.
This commit is contained in:
committed by
GitHub
parent
8c5ab4c07d
commit
6d65aa311a
@@ -56,6 +56,7 @@ In order to change the dynamic configuration you can use either :ref:`patronictl
|
||||
- **archive\_cleanup\_command**: cleanup command for standby leader
|
||||
- **recovery\_min\_apply\_delay**: how long to wait before actually apply WAL records on a standby leader
|
||||
|
||||
- **member_slots_ttl**: retention time of physical replication slots for replicas when they are shut down. Default value: `30min`. Set it to `0` if you want to keep the old behavior (when the member key expires from DCS, the slot is immediately removed). The feature works only starting from PostgreSQL 11.
|
||||
- **slots**: define permanent replication slots. These slots will be preserved during switchover/failover. Permanent slots that don't exist will be created by Patroni. With PostgreSQL 11 onwards permanent physical slots are created on all nodes and their position is advanced every **loop_wait** seconds. For PostgreSQL versions older than 11 permanent physical replication slots are maintained only on the current primary. The logical slots are copied from the primary to a standby with restart, and after that their position advanced every **loop_wait** seconds (if necessary). Copying logical slot files performed via ``libpq`` connection and using either rewind or superuser credentials (see **postgresql.authentication** section). There is always a chance that the logical slot position on the replica is a bit behind the former primary, therefore application should be prepared that some messages could be received the second time after the failover. The easiest way of doing so - tracking ``confirmed_flush_lsn``. Enabling permanent replication slots requires **postgresql.use_slots** to be set to ``true``. If there are permanent logical replication slots defined Patroni will automatically enable the ``hot_standby_feedback``. Since the failover of logical replication slots is unsafe on PostgreSQL 9.6 and older and PostgreSQL version 10 is missing some important functions, the feature only works with PostgreSQL 11+.
|
||||
|
||||
- **my\_slot\_name**: the name of the permanent replication slot. If the permanent slot name matches with the name of the current node it will not be created on this node. If you add a permanent physical replication slot which name matches the name of a Patroni member, Patroni will ensure that the slot that was created is not removed even if the corresponding member becomes unresponsive, situation which would normally result in the slot's removal by Patroni. Although this can be useful in some situations, such as when you want replication slots used by members to persist during temporary failures or when importing existing members to a new Patroni cluster (see :ref:`Convert a Standalone to a Patroni Cluster <existing_data>` for details), caution should be exercised by the operator that these clashes in names are not persisted in the DCS, when the slot is no longer required, due to its effect on normal functioning of Patroni.
|
||||
@@ -92,7 +93,7 @@ Note: **slots** is a hashmap while **ignore_slots** is an array. For example:
|
||||
type: physical
|
||||
...
|
||||
|
||||
Note: if cluster topology is static (fixed number of nodes that never change their names) you can configure permanent physical replication slots with names corresponding to names of nodes to avoid recycling of WAL files while replica is temporary down:
|
||||
Note: When running PostgreSQL v11 or newer Patroni maintains physical replication slots on all nodes that could potentially become a leader, so that replica nodes keep WAL segments reserved if they are potentially required by other nodes. In case the node is absent and its member key in DCS gets expired, the corresponding replication slot is dropped after ``member_slots_ttl`` (default value is `30min`). You can increase or decrease retention based on your needs. Alternatively, if your cluster topology is static (fixed number of nodes that never change their names) you can configure permanent physical replication slots with names corresponding to the names of the nodes to avoid slots removal and recycling of WAL files while replica is temporarily down:
|
||||
|
||||
.. code:: YAML
|
||||
|
||||
@@ -108,7 +109,7 @@ Note: if cluster topology is static (fixed number of nodes that never change the
|
||||
|
||||
.. warning::
|
||||
Permanent replication slots are synchronized only from the ``primary``/``standby_leader`` to replica nodes. That means, applications are supposed to be using them only from the leader node. Using them on replica nodes will cause indefinite growth of ``pg_wal`` on all other nodes in the cluster.
|
||||
An exception to that rule are permanent physical slots that match the Patroni member names, if you happen to configure any. Those will be synchronized among all nodes as they are used for replication among them.
|
||||
An exception to that rule are physical slots that match the Patroni member names (created and maintained by Patroni). Those will be synchronized among all nodes as they are used for replication among them.
|
||||
|
||||
|
||||
.. warning::
|
||||
|
||||
@@ -181,7 +181,10 @@ What is the difference between ``etcd`` and ``etcd3`` in Patroni configuration?
|
||||
* API version 2 will be completely removed on Etcd v3.6.
|
||||
|
||||
I have ``use_slots`` enabled in my Patroni configuration, but when a cluster member goes offline for some time, the replication slot used by that member is dropped on the upstream node. What can I do to avoid that issue?
|
||||
You can configure a permanent physical replication slot for the members.
|
||||
There are two options:
|
||||
|
||||
1. You can tune ``member_slots_ttl`` (default value ``30min``, available since Patroni ``4.0.0`` and PostgreSQL 11 onwards) and replication slots for absent members will not be removed when the members downtime is shorter than the configured threshold.
|
||||
2. You can configure permanent physical replication slots for the members.
|
||||
|
||||
Since Patroni ``3.2.0`` it is now possible to have member slots as permanent slots managed by Patroni.
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ Feature: dcs failsafe mode
|
||||
@dcs-failsafe
|
||||
@slot-advance
|
||||
Scenario: make sure permanent slots exist on replicas
|
||||
Given I issue a PATCH request to http://127.0.0.1:8009/config with {"slots":{"postgres2":0,"dcs_slot_0":null,"dcs_slot_2":{"type":"logical","database":"postgres","plugin":"test_decoding"}}}
|
||||
Given I issue a PATCH request to http://127.0.0.1:8009/config with {"slots":{"dcs_slot_0":null,"dcs_slot_2":{"type":"logical","database":"postgres","plugin":"test_decoding"}}}
|
||||
Then logical slot dcs_slot_2 is in sync between postgres1 and postgres0 after 20 seconds
|
||||
And logical slot dcs_slot_2 is in sync between postgres1 and postgres2 after 20 seconds
|
||||
When I get all changes from physical slot dcs_slot_1 on postgres1
|
||||
|
||||
@@ -16,3 +16,11 @@ Scenario: check permanent logical replication slots are not copied
|
||||
Then "members/postgres2" key in DCS has replication_state=streaming after 10 seconds
|
||||
And postgres1 does not have a replication slot named test_logical
|
||||
And postgres2 does not have a replication slot named test_logical
|
||||
|
||||
@slot-advance
|
||||
Scenario: check that slots are written to the /status key
|
||||
Given "status" key in DCS has postgres0 in slots
|
||||
And "status" key in DCS has postgres2 in slots
|
||||
And "status" key in DCS has test_logical in slots
|
||||
And "status" key in DCS has test_logical in slots
|
||||
And "status" key in DCS does not have postgres1 in slots
|
||||
|
||||
@@ -3,7 +3,7 @@ Feature: permanent slots
|
||||
Given I start postgres0
|
||||
Then postgres0 is a leader after 10 seconds
|
||||
And there is a non empty initialize key in DCS after 15 seconds
|
||||
When I issue a PATCH request to http://127.0.0.1:8008/config with {"slots":{"test_physical":0,"postgres0":0,"postgres1":0,"postgres3":0},"postgresql":{"parameters":{"wal_level":"logical"}}}
|
||||
When I issue a PATCH request to http://127.0.0.1:8008/config with {"slots":{"test_physical":0,"postgres3":0},"postgresql":{"parameters":{"wal_level":"logical"}}}
|
||||
Then I receive a response code 200
|
||||
And Response on GET http://127.0.0.1:8008/config contains slots after 10 seconds
|
||||
When I start postgres1
|
||||
@@ -34,12 +34,14 @@ Feature: permanent slots
|
||||
Scenario: check permanent physical slots that match with member names
|
||||
Given postgres0 has a physical replication slot named postgres3 after 2 seconds
|
||||
And postgres1 has a physical replication slot named postgres0 after 2 seconds
|
||||
And postgres1 has a physical replication slot named postgres2 after 2 seconds
|
||||
And postgres1 has a physical replication slot named postgres3 after 2 seconds
|
||||
And postgres2 has a physical replication slot named postgres0 after 2 seconds
|
||||
And postgres2 has a physical replication slot named postgres3 after 2 seconds
|
||||
And postgres2 has a physical replication slot named postgres1 after 2 seconds
|
||||
And postgres1 does not have a replication slot named postgres2
|
||||
And postgres3 does not have a replication slot named postgres2
|
||||
And postgres3 has a physical replication slot named postgres0 after 2 seconds
|
||||
And postgres3 has a physical replication slot named postgres1 after 2 seconds
|
||||
And postgres3 has a physical replication slot named postgres2 after 2 seconds
|
||||
|
||||
@slot-advance
|
||||
Scenario: check that permanent slots are advanced on replicas
|
||||
@@ -53,19 +55,25 @@ Feature: permanent slots
|
||||
And Logical slot test_logical is in sync between postgres0 and postgres3 after 10 seconds
|
||||
And Physical slot test_physical is in sync between postgres0 and postgres3 after 10 seconds
|
||||
And Physical slot postgres1 is in sync between postgres0 and postgres2 after 10 seconds
|
||||
And Physical slot postgres1 is in sync between postgres0 and postgres3 after 10 seconds
|
||||
And Physical slot postgres3 is in sync between postgres2 and postgres0 after 20 seconds
|
||||
And Physical slot postgres3 is in sync between postgres2 and postgres1 after 10 seconds
|
||||
And postgres1 does not have a replication slot named postgres2
|
||||
And postgres3 does not have a replication slot named postgres2
|
||||
|
||||
@slot-advance
|
||||
Scenario: check that only permanent slots are written to the /status key
|
||||
Scenario: check that permanent slots and member slots are written to the /status key
|
||||
Given "status" key in DCS has test_physical in slots
|
||||
And "status" key in DCS has postgres0 in slots
|
||||
And "status" key in DCS has postgres1 in slots
|
||||
And "status" key in DCS does not have postgres2 in slots
|
||||
And "status" key in DCS has postgres2 in slots
|
||||
And "status" key in DCS has postgres3 in slots
|
||||
|
||||
@slot-advance
|
||||
Scenario: check that only non-permanent member slots are written to the retain_slots in /status key
|
||||
And "status" key in DCS has postgres0 in retain_slots
|
||||
And "status" key in DCS has postgres1 in retain_slots
|
||||
And "status" key in DCS has postgres2 in retain_slots
|
||||
And "status" key in DCS does not have postgres3 in retain_slots
|
||||
|
||||
Scenario: check permanent physical replication slot after failover
|
||||
Given I shut down postgres3
|
||||
And I shut down postgres2
|
||||
|
||||
@@ -751,9 +751,11 @@ class Status(NamedTuple):
|
||||
|
||||
:ivar last_lsn: :class:`int` object containing position of last known leader LSN.
|
||||
:ivar slots: state of permanent replication slots on the primary in the format: ``{"slot_name": int}``.
|
||||
:ivar retain_slots: list physical replication slots for members that exist in the cluster.
|
||||
"""
|
||||
last_lsn: int
|
||||
slots: Optional[Dict[str, int]]
|
||||
retain_slots: List[str]
|
||||
|
||||
@staticmethod
|
||||
def empty() -> 'Status':
|
||||
@@ -761,20 +763,20 @@ class Status(NamedTuple):
|
||||
|
||||
:returns: empty :class:`Status` object.
|
||||
"""
|
||||
return Status(0, None)
|
||||
return Status(0, None, [])
|
||||
|
||||
def is_empty(self):
|
||||
"""Validate definition of all attributes of this :class:`Status` instance.
|
||||
|
||||
:returns: ``True`` if all attributes of the current :class:`Status` are unpopulated.
|
||||
"""
|
||||
return self.last_lsn == 0 and self.slots is None
|
||||
return self.last_lsn == 0 and self.slots is None and not self.retain_slots
|
||||
|
||||
@staticmethod
|
||||
def from_node(value: Union[str, Dict[str, Any], None]) -> 'Status':
|
||||
"""Factory method to parse *value* as :class:`Status` object.
|
||||
|
||||
:param value: JSON serialized string
|
||||
:param value: JSON serialized string or :class:`dict` object.
|
||||
|
||||
:returns: constructed :class:`Status` object.
|
||||
"""
|
||||
@@ -785,7 +787,7 @@ class Status(NamedTuple):
|
||||
return Status.empty()
|
||||
|
||||
if isinstance(value, int): # legacy
|
||||
return Status(value, None)
|
||||
return Status(value, None, [])
|
||||
|
||||
if not isinstance(value, dict):
|
||||
return Status.empty()
|
||||
@@ -804,7 +806,16 @@ class Status(NamedTuple):
|
||||
if not isinstance(slots, dict):
|
||||
slots = None
|
||||
|
||||
return Status(last_lsn, slots)
|
||||
retain_slots: Union[str, List[str], None] = value.get('retain_slots')
|
||||
if isinstance(retain_slots, str):
|
||||
try:
|
||||
retain_slots = json.loads(retain_slots)
|
||||
except Exception:
|
||||
retain_slots = []
|
||||
if not isinstance(retain_slots, list):
|
||||
retain_slots = []
|
||||
|
||||
return Status(last_lsn, slots, retain_slots)
|
||||
|
||||
|
||||
class Cluster(NamedTuple('Cluster',
|
||||
@@ -880,7 +891,8 @@ class Cluster(NamedTuple('Cluster',
|
||||
|
||||
>>> assert bool(cluster) is False
|
||||
|
||||
>>> cluster = Cluster(None, None, None, Status(0, None), [1, 2, 3], None, SyncState.empty(), None, None, {})
|
||||
>>> status = Status(0, None, [])
|
||||
>>> cluster = Cluster(None, None, None, status, [1, 2, 3], None, SyncState.empty(), None, None, {})
|
||||
>>> len(cluster)
|
||||
1
|
||||
|
||||
@@ -984,7 +996,7 @@ class Cluster(NamedTuple('Cluster',
|
||||
return ret
|
||||
|
||||
@property
|
||||
def __permanent_physical_slots(self) -> Dict[str, Any]:
|
||||
def permanent_physical_slots(self) -> Dict[str, Any]:
|
||||
"""Dictionary of permanent ``physical`` replication slots."""
|
||||
return {name: value for name, value in self.__permanent_slots.items() if self.is_physical_slot(value)}
|
||||
|
||||
@@ -1011,7 +1023,8 @@ class Cluster(NamedTuple('Cluster',
|
||||
name = member.name if isinstance(member, Member) else postgresql.name
|
||||
role = role or postgresql.role
|
||||
|
||||
slots: Dict[str, Dict[str, str]] = self._get_members_slots(name, role)
|
||||
slots: Dict[str, Dict[str, Any]] = self._get_members_slots(name, role,
|
||||
member.nofailover, postgresql.can_advance_slots)
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role)
|
||||
|
||||
disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots(
|
||||
@@ -1096,12 +1109,13 @@ class Cluster(NamedTuple('Cluster',
|
||||
return {}
|
||||
|
||||
if global_config.is_standby_cluster or self.get_slot_name_on_primary(postgresql.name, tags) is None:
|
||||
return self.__permanent_physical_slots if postgresql.can_advance_slots or role == 'standby_leader' else {}
|
||||
return self.permanent_physical_slots if postgresql.can_advance_slots or role == 'standby_leader' else {}
|
||||
|
||||
return self.__permanent_slots if postgresql.can_advance_slots or role in ('master', 'primary') \
|
||||
else self.__permanent_logical_slots
|
||||
|
||||
def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]:
|
||||
def _get_members_slots(self, name: str, role: str, nofailover: bool,
|
||||
can_advance_slots: bool) -> Dict[str, Dict[str, Any]]:
|
||||
"""Get physical replication slots configuration for members that sourcing from this node.
|
||||
|
||||
If the ``replicatefrom`` tag is set on the member - we should not create the replication slot for it on
|
||||
@@ -1118,9 +1132,10 @@ class Cluster(NamedTuple('Cluster',
|
||||
* Conflicting slot names between members are found
|
||||
|
||||
:param name: name of this node.
|
||||
:param role: role of this node, if this is a ``primary`` or ``standby_leader`` return list of members
|
||||
replicating from this node. If not then return a list of members replicating as cascaded
|
||||
replicas from this node.
|
||||
:param role: role of this node, ``primary``, ``standby_leader``, or ``replica``.
|
||||
:param nofailover: ``True`` if this node is tagged to not be a failover candidate, ``False`` otherwise.
|
||||
:param can_advance_slots: ``True`` if ``pg_replication_slot_advance()`` function is available,
|
||||
``False`` otherwise.
|
||||
|
||||
:returns: dictionary of physical replication slots that should exist on a given node.
|
||||
"""
|
||||
@@ -1131,15 +1146,34 @@ class Cluster(NamedTuple('Cluster',
|
||||
# also exlude members with disabled WAL streaming
|
||||
members = filter(lambda m: m.name != name and not m.nostream, self.members)
|
||||
|
||||
if role in ('master', 'primary', 'standby_leader'):
|
||||
if can_advance_slots and global_config.member_slots_ttl > 0:
|
||||
# if the node does only cascading and can't become the leader, we
|
||||
# want only to have slots for members that could connect to it.
|
||||
members = [m for m in members if not nofailover or m.replicatefrom == name]
|
||||
elif role in ('master', 'primary', 'standby_leader'): # PostgreSQL is older than 11
|
||||
# on the leader want to have slots only for the nodes that are supposed to be replicating from it.
|
||||
members = [m for m in members if m.replicatefrom is None
|
||||
or m.replicatefrom == name or not self.has_member(m.replicatefrom)]
|
||||
else:
|
||||
# only manage slots for replicas that replicate from this one, except for the leader among them
|
||||
members = [m for m in members if m.replicatefrom == name and m.name != self.leader_name]
|
||||
|
||||
slots = {slot_name_from_member_name(m.name): {'type': 'physical'} for m in members}
|
||||
if len(slots) < len(members):
|
||||
slots: Dict[str, int] = self.slots
|
||||
ret: Dict[str, Dict[str, Any]] = {}
|
||||
for member in members:
|
||||
slot_name = slot_name_from_member_name(member.name)
|
||||
lsn = slots.get(slot_name, 0)
|
||||
if member.replicatefrom:
|
||||
# `/status` key is maintained by the leader, but `member` may be connected to some other node.
|
||||
# In that case, the slot in the leader is inactive and doesn't advance, so we use the LSN
|
||||
# reported by the member to advance replication slot LSN.
|
||||
# `max` is only a fallback so we take the LSN from the slot when there is no feedback from the member.
|
||||
lsn = max(member.lsn or 0, lsn)
|
||||
ret[slot_name] = {'type': 'physical', 'lsn': lsn}
|
||||
ret.update({slot: {'type': 'physical'} for slot in self.status.retain_slots
|
||||
if slot not in ret and slot != name})
|
||||
|
||||
if len(ret) < len(members):
|
||||
# Find which names are conflicting for a nicer error message
|
||||
slot_conflicts: Dict[str, List[str]] = defaultdict(list)
|
||||
for member in members:
|
||||
@@ -1147,7 +1181,7 @@ class Cluster(NamedTuple('Cluster',
|
||||
logger.error("Following cluster members share a replication slot name: %s",
|
||||
"; ".join(f"{', '.join(v)} map to {k}"
|
||||
for k, v in slot_conflicts.items() if len(v) > 1))
|
||||
return slots
|
||||
return ret
|
||||
|
||||
def has_permanent_slots(self, postgresql: 'Postgresql', member: Tags) -> bool:
|
||||
"""Check if our node has permanent replication slots configured.
|
||||
@@ -1155,26 +1189,31 @@ class Cluster(NamedTuple('Cluster',
|
||||
:param postgresql: reference to :class:`Postgresql` object.
|
||||
:param member: reference to an object implementing :class:`Tags` interface for
|
||||
the node that we are checking permanent logical replication slots for.
|
||||
|
||||
:returns: ``True`` if there are permanent replication slots configured, otherwise ``False``.
|
||||
"""
|
||||
role = 'replica'
|
||||
members_slots: Dict[str, Dict[str, str]] = self._get_members_slots(postgresql.name, role)
|
||||
members_slots: Dict[str, Dict[str, str]] = self._get_members_slots(postgresql.name, role,
|
||||
member.nofailover,
|
||||
postgresql.can_advance_slots)
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, member, role)
|
||||
slots = deepcopy(members_slots)
|
||||
self._merge_permanent_slots(slots, permanent_slots, postgresql.name, postgresql.can_advance_slots)
|
||||
return len(slots) > len(members_slots) or any(self.is_physical_slot(v) for v in permanent_slots.values())
|
||||
|
||||
def filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]) -> Dict[str, int]:
|
||||
def maybe_filter_permanent_slots(self, postgresql: 'Postgresql', slots: Dict[str, int]) -> Dict[str, int]:
|
||||
"""Filter out all non-permanent slots from provided *slots* dict.
|
||||
|
||||
.. note::
|
||||
In case if retention of replication slots for members is enabled we will not do
|
||||
any filtering, because we need to publish LSN values for members replication slots,
|
||||
so that other nodes can use them to advance LSN, like they do it for permanent slots.
|
||||
|
||||
:param postgresql: reference to :class:`Postgresql` object.
|
||||
:param slots: slot names with LSN values.
|
||||
|
||||
:returns: a :class:`dict` object that contains only slots that are known to be permanent.
|
||||
"""
|
||||
if not postgresql.can_advance_slots:
|
||||
return {} # for legacy PostgreSQL we don't support permanent slots on standby nodes
|
||||
if global_config.member_slots_ttl > 0:
|
||||
return slots
|
||||
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(postgresql, RemoteMember('', {}), 'replica')
|
||||
members_slots = {slot_name_from_member_name(m.name) for m in self.members}
|
||||
@@ -1412,7 +1451,8 @@ class AbstractDCS(abc.ABC):
|
||||
self._cluster_thread_lock = Lock()
|
||||
self._last_lsn: int = 0
|
||||
self._last_seen: int = 0
|
||||
self._last_status: Dict[str, Any] = {}
|
||||
self._last_status: Dict[str, Any] = {'retain_slots': []}
|
||||
self._last_retain_slots: Dict[str, float] = {}
|
||||
self._last_failsafe: Optional[Dict[str, str]] = {}
|
||||
self.event = Event()
|
||||
|
||||
@@ -1638,7 +1678,7 @@ class AbstractDCS(abc.ABC):
|
||||
self._cluster_valid_till = time.time() + self.ttl
|
||||
|
||||
self._last_seen = int(time.time())
|
||||
self._last_status = {self._OPTIME: cluster.status.last_lsn}
|
||||
self._last_status = {self._OPTIME: cluster.status.last_lsn, 'retain_slots': cluster.status.retain_slots}
|
||||
if cluster.status.slots:
|
||||
self._last_status['slots'] = cluster.status.slots
|
||||
self._last_failsafe = cluster.failsafe
|
||||
@@ -1698,6 +1738,14 @@ class AbstractDCS(abc.ABC):
|
||||
|
||||
:param value: JSON serializable dictionary with current WAL LSN and ``confirmed_flush_lsn`` of permanent slots.
|
||||
"""
|
||||
# This method is always called with ``optime`` key, rest of the keys are optional.
|
||||
# In case if we know old values (stored in self._last_status), we will copy them over.
|
||||
for name in ('slots', 'retain_slots'):
|
||||
if name not in value and self._last_status.get(name):
|
||||
value[name] = self._last_status[name]
|
||||
# if the key is present, but the value is None, we will not write such pair.
|
||||
value = {k: v for k, v in value.items() if v is not None}
|
||||
|
||||
if not deep_compare(self._last_status, value) and self._write_status(json.dumps(value, separators=(',', ':'))):
|
||||
self._last_status = value
|
||||
cluster = self.cluster
|
||||
@@ -1729,6 +1777,53 @@ class AbstractDCS(abc.ABC):
|
||||
"""Stored value of :attr:`~AbstractDCS._last_failsafe`."""
|
||||
return self._last_failsafe
|
||||
|
||||
def _build_retain_slots(self, cluster: Cluster, slots: Optional[Dict[str, int]]) -> Optional[List[str]]:
|
||||
"""Handle retention policy of physical replication slots for cluster members.
|
||||
|
||||
When the member key is missing we want to keep its replication slot for a while, so that WAL segments
|
||||
will not be already absent when it comes back online. It is being solved by storing the list of
|
||||
replication slots representing members in the ``retain_slots`` field of the ``/status`` key.
|
||||
|
||||
This method handles retention policy by keeping the list of such replication slots in memory
|
||||
and removing names when they were observed longer than ``member_slots_ttl`` ago.
|
||||
|
||||
:param cluster: :class:`Cluster` object with information about the current cluster state.
|
||||
:param slots: slot names with LSN values that exist on the leader node and consists
|
||||
of slots for cluster members and permanent replication slots.
|
||||
|
||||
:returns: the list of replication slots to be written to ``/status`` key or ``None``.
|
||||
"""
|
||||
timestamp = time.time()
|
||||
|
||||
# DCS is a source of truth, therefore we take missing values from there
|
||||
self._last_retain_slots.update({name: timestamp for name in self._last_status['retain_slots']
|
||||
if (not slots or name not in slots) and name not in self._last_retain_slots})
|
||||
|
||||
if slots: # if slots is not empty it implies we are running v11+
|
||||
members: Set[str] = set()
|
||||
found_self = False
|
||||
for member in cluster.members:
|
||||
found_self = member.name == self._name
|
||||
if not member.nostream:
|
||||
members.add(slot_name_from_member_name(member.name))
|
||||
if not found_self:
|
||||
# It could be that the member key for our node is not in DCS and we can't check tags.nostream.
|
||||
# In this case our name will falsely appear in `retain_slots`, but only temporary.
|
||||
members.add(slot_name_from_member_name(self._name))
|
||||
|
||||
permanent_slots = cluster.permanent_physical_slots
|
||||
# we want to have in ``retain_slots`` only non-permanent member slots
|
||||
self._last_retain_slots.update({name: timestamp for name in slots
|
||||
if name in members and name not in permanent_slots})
|
||||
# retention
|
||||
for name, value in list(self._last_retain_slots.items()):
|
||||
if value + global_config.member_slots_ttl <= timestamp:
|
||||
logger.info("Replication slot '%s' for absent cluster member is expired after %d sec.",
|
||||
name, global_config.member_slots_ttl)
|
||||
del self._last_retain_slots[name]
|
||||
|
||||
return list(sorted(self._last_retain_slots.keys())) or None
|
||||
|
||||
@abc.abstractmethod
|
||||
def _update_leader(self, leader: Leader) -> bool:
|
||||
"""Update ``leader`` key (or session) ttl.
|
||||
@@ -1763,9 +1858,8 @@ class AbstractDCS(abc.ABC):
|
||||
assert isinstance(cluster.leader, Leader)
|
||||
ret = self._update_leader(cluster.leader)
|
||||
if ret and last_lsn:
|
||||
status: Dict[str, Any] = {self._OPTIME: last_lsn}
|
||||
if slots:
|
||||
status['slots'] = slots
|
||||
status: Dict[str, Any] = {self._OPTIME: last_lsn, 'slots': slots or None,
|
||||
'retain_slots': self._build_retain_slots(cluster, slots)}
|
||||
self.write_status(status)
|
||||
|
||||
if ret and failsafe is not None:
|
||||
|
||||
@@ -1244,6 +1244,8 @@ class Kubernetes(AbstractDCS):
|
||||
if last_lsn:
|
||||
annotations[self._OPTIME] = str(last_lsn)
|
||||
annotations['slots'] = json.dumps(slots, separators=(',', ':')) if slots else None
|
||||
retain_slots = self._build_retain_slots(cluster, slots)
|
||||
annotations['retain_slots'] = json.dumps(retain_slots) if retain_slots else None
|
||||
|
||||
if failsafe is not None:
|
||||
annotations[self._FAILSAFE] = json.dumps(failsafe, separators=(',', ':')) if failsafe else None
|
||||
|
||||
@@ -135,16 +135,18 @@ class GlobalConfig(types.ModuleType):
|
||||
return isinstance(config, dict) and\
|
||||
bool(config.get('host') or config.get('port') or config.get('restore_command'))
|
||||
|
||||
def get_int(self, name: str, default: int = 0) -> int:
|
||||
def get_int(self, name: str, default: int = 0, base_unit: Optional[str] = None) -> int:
|
||||
"""Gets current value of *name* from the global configuration and try to return it as :class:`int`.
|
||||
|
||||
:param name: name of the parameter.
|
||||
:param default: default value if *name* is not in the configuration or invalid.
|
||||
:param base_unit: an optional base unit to convert value of *name* parameter to.
|
||||
Not used if the value does not contain a unit.
|
||||
|
||||
:returns: currently configured value of *name* from the global configuration or *default* if it is not set or
|
||||
invalid.
|
||||
"""
|
||||
ret = parse_int(self.get(name))
|
||||
ret = parse_int(self.get(name), base_unit)
|
||||
return default if ret is None else ret
|
||||
|
||||
@property
|
||||
@@ -231,5 +233,13 @@ class GlobalConfig(types.ModuleType):
|
||||
or self.get('slots')
|
||||
or EMPTY_DICT.copy())
|
||||
|
||||
@property
|
||||
def member_slots_ttl(self) -> int:
|
||||
"""Currently configured value of ``member_slots_ttl`` from the global configuration converted to seconds.
|
||||
|
||||
Assume ``1800`` if it is not set or invalid.
|
||||
"""
|
||||
return self.get_int('member_slots_ttl', 1800, base_unit='s')
|
||||
|
||||
|
||||
sys.modules[__name__] = GlobalConfig()
|
||||
|
||||
@@ -14,7 +14,7 @@ from . import global_config, psycopg
|
||||
from .__main__ import Patroni
|
||||
from .async_executor import AsyncExecutor, CriticalTask
|
||||
from .collections import CaseInsensitiveSet
|
||||
from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, slot_name_from_member_name, Status, SyncState
|
||||
from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, Status, SyncState
|
||||
from .exceptions import DCSError, PatroniFatalException, PostgresConnectionException
|
||||
from .postgresql.callback_executor import CallbackAction
|
||||
from .postgresql.misc import postgres_version_to_int
|
||||
@@ -173,7 +173,7 @@ class Failsafe(object):
|
||||
leader = self.leader
|
||||
if leader:
|
||||
# We rely on the strict order of fields in the namedtuple
|
||||
status = Status(cluster.status.last_lsn, leader.member.data['slots'])
|
||||
status = Status(cluster.status[0], leader.member.data['slots'], *cluster.status[2:])
|
||||
cluster = Cluster(*cluster[0:2], leader, status, *cluster[4:])
|
||||
# To advance LSN of replication slots on the primary for nodes that are doing cascading
|
||||
# replication from other nodes we need to update `xlog_location` on respective members.
|
||||
@@ -383,9 +383,7 @@ class Ha(object):
|
||||
if update_status:
|
||||
try:
|
||||
last_lsn = self._last_wal_lsn = self.state_handler.last_operation()
|
||||
slots = self.cluster.filter_permanent_slots(
|
||||
self.state_handler,
|
||||
{**self.state_handler.slots(), slot_name_from_member_name(self.state_handler.name): last_lsn})
|
||||
slots = self.cluster.maybe_filter_permanent_slots(self.state_handler, self.state_handler.slots())
|
||||
except Exception:
|
||||
logger.exception('Exception when called state_handler.last_operation()')
|
||||
try:
|
||||
@@ -1221,12 +1219,10 @@ class Ha(object):
|
||||
'api_url': self.patroni.api.connection_string,
|
||||
}
|
||||
try:
|
||||
data['slots'] = {
|
||||
**self.state_handler.slots(),
|
||||
slot_name_from_member_name(self.state_handler.name): self._last_wal_lsn
|
||||
}
|
||||
data['slots'] = self.state_handler.slots()
|
||||
except Exception:
|
||||
logger.exception('Exception when called state_handler.slots()')
|
||||
|
||||
members = [RemoteMember(name, {'api_url': url})
|
||||
for name, url in failsafe.items() if name != self.state_handler.name]
|
||||
if not members: # A sinlge node cluster
|
||||
|
||||
@@ -18,7 +18,7 @@ from psutil import TimeoutExpired
|
||||
from .. import global_config, psycopg
|
||||
from ..async_executor import CriticalTask
|
||||
from ..collections import CaseInsensitiveDict, EMPTY_DICT
|
||||
from ..dcs import Cluster, Leader, Member
|
||||
from ..dcs import Cluster, Leader, Member, slot_name_from_member_name
|
||||
from ..exceptions import PostgresConnectionException
|
||||
from ..tags import Tags
|
||||
from ..utils import data_directory_is_empty, parse_int, polling_loop, Retry, RetryFailedError
|
||||
@@ -112,7 +112,7 @@ class Postgresql(object):
|
||||
self._state_entry_timestamp = 0
|
||||
|
||||
self._cluster_info_state = {}
|
||||
self._has_permanent_slots = True
|
||||
self._should_query_slots = True
|
||||
self._enforce_hot_standby_feedback = False
|
||||
self._cached_replica_timeline = None
|
||||
|
||||
@@ -231,7 +231,7 @@ class Postgresql(object):
|
||||
"plugin, catalog_xmin, pg_catalog.pg_wal_lsn_diff(confirmed_flush_lsn, '0/0')::bigint"
|
||||
" AS confirmed_flush_lsn, pg_catalog.pg_wal_lsn_diff(restart_lsn, '0/0')::bigint"
|
||||
" AS restart_lsn FROM pg_catalog.pg_get_replication_slots()) AS s)"
|
||||
if self._has_permanent_slots and self.can_advance_slots else "NULL") + extra
|
||||
if self._should_query_slots and self.can_advance_slots else "NULL") + extra
|
||||
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END,"
|
||||
" slot_name, conninfo, status, {0} FROM pg_catalog.pg_stat_get_wal_receiver()").format(extra)
|
||||
if self.role == 'standby_leader':
|
||||
@@ -460,7 +460,7 @@ class Postgresql(object):
|
||||
# to have a logical slot or in case if it is the cascading replica.
|
||||
self.set_enforce_hot_standby_feedback(not global_config.is_standby_cluster and self.can_advance_slots
|
||||
and cluster.should_enforce_hot_standby_feedback(self, tags))
|
||||
self._has_permanent_slots = cluster.has_permanent_slots(self, tags)
|
||||
self._should_query_slots = global_config.member_slots_ttl > 0 or cluster.has_permanent_slots(self, tags)
|
||||
|
||||
def _cluster_info_state_get(self, name: str) -> Optional[Any]:
|
||||
if not self._cluster_info_state:
|
||||
@@ -471,7 +471,7 @@ class Postgresql(object):
|
||||
'received_tli', 'slot_name', 'conninfo', 'receiver_state',
|
||||
'restore_command', 'slots', 'synchronous_commit',
|
||||
'synchronous_standby_names', 'pg_stat_replication'], result))
|
||||
if self._has_permanent_slots and self.can_advance_slots:
|
||||
if self._should_query_slots and self.can_advance_slots:
|
||||
cluster_info_state['slots'] =\
|
||||
self.slots_handler.process_permanent_slots(cluster_info_state['slots'])
|
||||
self._cluster_info_state = cluster_info_state
|
||||
@@ -492,7 +492,19 @@ class Postgresql(object):
|
||||
return self._cluster_info_state_get('received_location')
|
||||
|
||||
def slots(self) -> Dict[str, int]:
|
||||
return self._cluster_info_state_get('slots') or {}
|
||||
"""Get replication slots state.
|
||||
|
||||
..note::
|
||||
Since this methods is supposed to be used only by the leader and only to publish state of
|
||||
replication slots to DCS so that other nodes can advance LSN on respective replication slots,
|
||||
we are also adding our own name to the list. All slots that shouldn't be published to DCS
|
||||
later will be filtered out by :meth:`~Cluster.maybe_filter_permanent_slots` method.
|
||||
|
||||
:returns: A :class:`dict` object with replication slot names and LSNs as absolute values.
|
||||
"""
|
||||
return {**(self._cluster_info_state_get('slots') or {}),
|
||||
slot_name_from_member_name(self.name): self.last_operation()} \
|
||||
if self.can_advance_slots else {}
|
||||
|
||||
def primary_slot_name(self) -> Optional[str]:
|
||||
return self._cluster_info_state_get('slot_name')
|
||||
|
||||
@@ -1028,6 +1028,7 @@ schema = Schema({
|
||||
Optional("retry_timeout"): IntValidator(min=3, raise_assert=True),
|
||||
Optional("maximum_lag_on_failover"): IntValidator(min=0, raise_assert=True),
|
||||
Optional("maximum_lag_on_syncnode"): IntValidator(min=-1, raise_assert=True),
|
||||
Optional('member_slots_ttl'): IntValidator(min=0, base_unit='s', raise_assert=True),
|
||||
Optional("postgresql"): {
|
||||
Optional("parameters"): {
|
||||
Optional("max_connections"): IntValidator(1, 262143, raise_assert=True),
|
||||
|
||||
@@ -46,8 +46,8 @@ def kv_get(self, key, **kwargs):
|
||||
'ModifyIndex': 6429, 'Value': b'{"leader": "leader", "sync_standby": null}'},
|
||||
{'CreateIndex': 1085, 'Flags': 0, 'Key': key + 'failsafe', 'LockIndex': 0,
|
||||
'ModifyIndex': 6429, 'Value': b'{'},
|
||||
{'CreateIndex': 1085, 'Flags': 0, 'Key': key + 'status', 'LockIndex': 0,
|
||||
'ModifyIndex': 6429, 'Value': b'{"optime":4496294792, "slots":{"ls":12345}}'}])
|
||||
{'CreateIndex': 1085, 'Flags': 0, 'Key': key + 'status', 'LockIndex': 0, 'ModifyIndex': 6429,
|
||||
'Value': b'{"optime":4496294792,"slots":{"ls":12345},"retain_slots":["postgresql0","postgresql1"]}'}])
|
||||
if key == 'service/good/':
|
||||
return good_cls
|
||||
if key == 'service/broken/':
|
||||
|
||||
@@ -77,7 +77,8 @@ def etcd_read(self, key, **kwargs):
|
||||
"modifiedIndex": 20730, "createdIndex": 20730}],
|
||||
"modifiedIndex": 1581, "createdIndex": 1581},
|
||||
{"key": "/service/batman5/failsafe", "value": '{', "modifiedIndex": 1582, "createdIndex": 1582},
|
||||
{"key": "/service/batman5/status", "value": '{"optime":2164261704,"slots":{"ls":12345}}',
|
||||
{"key": "/service/batman5/status",
|
||||
"value": '{"optime":2164261704,"slots":{"ls":12345},"retain_slots":["postgresql0","postgresql1"]}',
|
||||
"modifiedIndex": 1582, "createdIndex": 1582}], "modifiedIndex": 1581, "createdIndex": 1581}}
|
||||
if key == '/service/legacy/':
|
||||
response['node']['nodes'].pop()
|
||||
|
||||
@@ -225,7 +225,8 @@ class TestEtcd3(BaseTestEtcd3):
|
||||
"header": {"revision": "1"},
|
||||
"kvs": [
|
||||
{"key": base64_encode('/patroni/test/status'),
|
||||
"value": base64_encode('{"optime":1234567,"slots":{"ls":12345}}'), "mod_revision": '1'}
|
||||
"value": base64_encode('{"optime":1234567,"slots":{"ls":12345},"retain_slots": ["foo"]}'),
|
||||
"mod_revision": '1'}
|
||||
]
|
||||
})
|
||||
self.assertIsInstance(self.etcd3.get_cluster(), Cluster)
|
||||
|
||||
@@ -42,8 +42,8 @@ def get_cluster(initialize, leader, members, failover, sync, cluster_config=None
|
||||
t = datetime.datetime.now().isoformat()
|
||||
history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '","foo"]]',
|
||||
[(1, 67197376, 'no recovery target specified', t, 'foo')])
|
||||
cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True}, 1)
|
||||
return Cluster(initialize, cluster_config, leader, Status(10, None), members, failover, sync, history, failsafe)
|
||||
cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True, 'member_slots_ttl': 0}, 1)
|
||||
return Cluster(initialize, cluster_config, leader, Status(10, None, []), members, failover, sync, history, failsafe)
|
||||
|
||||
|
||||
def get_cluster_not_initialized_without_leader(cluster_config=None):
|
||||
|
||||
@@ -26,14 +26,16 @@ def mock_list_namespaced_config_map(*args, **kwargs):
|
||||
'annotations': {'initialize': '123', 'config': '{}'}}
|
||||
items = [k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))]
|
||||
metadata.update({'name': 'test-leader',
|
||||
'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s', 'slots': '{', 'failsafe': '{'}})
|
||||
'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s',
|
||||
'slots': '{', 'retain_slots': '{', 'failsafe': '{'}})
|
||||
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
||||
metadata.update({'name': 'test-failover', 'annotations': {'leader': 'p-0'}})
|
||||
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
||||
metadata.update({'name': 'test-sync', 'annotations': {'leader': 'p-0'}})
|
||||
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
||||
metadata.update({'name': 'test-0-leader', 'labels': {k8s_group_label: '0'},
|
||||
'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s', 'slots': '{', 'failsafe': '{'}})
|
||||
'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s',
|
||||
'slots': '{', 'retain_slots': '{', 'failsafe': '{'}})
|
||||
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
||||
metadata.update({'name': 'test-0-config', 'labels': {k8s_group_label: '0'},
|
||||
'annotations': {'initialize': '123', 'config': '{}'}})
|
||||
@@ -421,7 +423,7 @@ class TestKubernetesEndpoints(BaseTestKubernetes):
|
||||
mock_patch.side_effect = RetryFailedError('')
|
||||
self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123')
|
||||
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
|
||||
with patch('time.time', Mock(side_effect=[0, 100, 200, 0, 0, 0, 0, 100, 200])):
|
||||
with patch('time.time', Mock(side_effect=[0, 0, 100, 200, 0, 0, 0, 0, 0, 100, 200])):
|
||||
self.assertFalse(self.k.update_leader(cluster, '123'))
|
||||
self.assertFalse(self.k.update_leader(cluster, '123'))
|
||||
self.assertFalse(self.k.update_leader(cluster, '123'))
|
||||
|
||||
@@ -152,7 +152,8 @@ class TestRaft(unittest.TestCase):
|
||||
self.assertIsInstance(cluster, Cluster)
|
||||
self.assertIsInstance(cluster.workers[1], Cluster)
|
||||
self.assertTrue(raft.delete_leader(cluster.leader))
|
||||
self.assertTrue(raft._sync_obj.set(raft.status_path, '{"optime":1234567,"slots":{"ls":12345}}'))
|
||||
self.assertTrue(raft._sync_obj.set(raft.status_path,
|
||||
'{"optime":1234567,"slots":{"ls":12345},"retain_slots":["postgresql0"]}'))
|
||||
raft.get_cluster()
|
||||
self.assertTrue(raft.update_leader(cluster, '1', failsafe={'foo': 'bat'}))
|
||||
self.assertTrue(raft._sync_obj.set(raft.failsafe_path, '{"foo"}'))
|
||||
|
||||
@@ -38,7 +38,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
self.s = self.p.slots_handler
|
||||
self.p.start()
|
||||
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'ls2': None}}, 1)
|
||||
self.cluster = Cluster(True, config, self.leader, Status(0, {'ls': 12345, 'ls2': 12345}),
|
||||
self.cluster = Cluster(True, config, self.leader, Status(0, {'ls': 12345, 'ls2': 12345}, []),
|
||||
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
||||
global_config.update(self.cluster)
|
||||
self.tags = TestTags()
|
||||
@@ -47,7 +47,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'},
|
||||
'A': 0, 'ls': 0, 'b': {'type': 'logical', 'plugin': '1'}},
|
||||
'ignore_slots': [{'name': 'blabla'}]}, 1)
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'test_3': 10}),
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'test_3': 10}, []),
|
||||
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
||||
global_config.update(cluster)
|
||||
with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg.OperationalError)):
|
||||
@@ -89,7 +89,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'replicatefrom': 'postgresql0'}
|
||||
})
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'ls': 10}),
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'ls': 10}, []),
|
||||
[self.me, self.other, self.leadermem, cascading_replica], None, SyncState.empty(), None, None)
|
||||
self.p.set_role('replica')
|
||||
with patch.object(Postgresql, '_query') as mock_query, \
|
||||
@@ -114,30 +114,33 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
"confirmed_flush_lsn": 12345, "catalog_xmin": 105, "restart_lsn": 12344},
|
||||
{"slot_name": "blabla", "type": "physical", "datoid": None, "plugin": None,
|
||||
"confirmed_flush_lsn": None, "catalog_xmin": 105, "restart_lsn": 12344}])]
|
||||
self.assertEqual(self.p.slots(), {'ls': 12345, 'blabla': 12344})
|
||||
self.assertEqual(self.p.slots(), {'ls': 12345, 'blabla': 12344, 'postgresql0': 0})
|
||||
|
||||
self.p.reset_cluster_info_state(None)
|
||||
mock_query.return_value = [(
|
||||
1, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
|
||||
[{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b",
|
||||
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])]
|
||||
self.assertEqual(self.p.slots(), {})
|
||||
self.assertEqual(self.p.slots(), {'postgresql0': 0})
|
||||
|
||||
def test_nostream_slot_processing(self):
|
||||
config = ClusterConfig(
|
||||
1, {'slots': {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}}, 1)
|
||||
nostream_node = Member(0, 'test-2', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'nostream': 'True'}
|
||||
'tags': {'nostream': 'True'},
|
||||
'xlog_location': 10,
|
||||
})
|
||||
cascade_node = Member(0, 'test-3', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'replicatefrom': 'test-2'}
|
||||
'tags': {'replicatefrom': 'test-2'},
|
||||
'xlog_location': 98
|
||||
})
|
||||
stream_node = Member(0, 'test-4', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'})
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'xlog_location': 99})
|
||||
cluster = Cluster(
|
||||
True, config, self.leader, Status.empty(),
|
||||
True, config, self.leader, Status(100, {'leader': 99, 'test_2': 98, 'test_3': 97, 'test_4': 98}, []),
|
||||
[self.leadermem, nostream_node, cascade_node, stream_node], None, SyncState.empty(), None, None)
|
||||
global_config.update(cluster)
|
||||
|
||||
@@ -147,8 +150,8 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
cluster._get_permanent_slots(self.p, self.leadermem, 'primary'),
|
||||
{'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}})
|
||||
self.assertEqual(
|
||||
cluster._get_members_slots(self.p.name, 'primary'),
|
||||
{'test_4': {'type': 'physical'}})
|
||||
cluster._get_members_slots(self.p.name, 'primary', False, True),
|
||||
{'test_3': {'type': 'physical', 'lsn': 98}, 'test_4': {'type': 'physical', 'lsn': 98}})
|
||||
|
||||
# nostream node must not have slot on primary
|
||||
self.p.name = nostream_node.name
|
||||
@@ -162,8 +165,10 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
|
||||
# check cascade member-slot existence on nostream node
|
||||
self.assertEqual(
|
||||
cluster._get_members_slots(nostream_node.name, 'replica'),
|
||||
{'test_3': {'type': 'physical'}})
|
||||
cluster._get_members_slots(nostream_node.name, 'replica', False, True),
|
||||
{'leader': {'type': 'physical', 'lsn': 99},
|
||||
'test_3': {'type': 'physical', 'lsn': 98},
|
||||
'test_4': {'type': 'physical', 'lsn': 98}})
|
||||
|
||||
# cascade also does not entitled to have logical slot on itself ...
|
||||
self.p.name = cascade_node.name
|
||||
@@ -291,7 +296,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
|
||||
def test_advance_physical_slots(self):
|
||||
config = ClusterConfig(1, {'slots': {'blabla': {'type': 'physical'}, 'leader': None}}, 1)
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'blabla': 12346}),
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'blabla': 12346}, []),
|
||||
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
||||
global_config.update(cluster)
|
||||
self.s.sync_replication_slots(cluster, self.tags)
|
||||
|
||||
@@ -54,7 +54,8 @@ class MockKazooClient(Mock):
|
||||
elif path.endswith('/initialize'):
|
||||
return (b'foo', ZnodeStat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
|
||||
elif path.endswith('/status'):
|
||||
return (b'{"optime":500,"slots":{"ls":1234567}}', ZnodeStat(0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0))
|
||||
return (b'{"optime":500,"slots":{"ls":1234567},"retain_slots":["postgresql0"]}',
|
||||
ZnodeStat(0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0))
|
||||
elif path.endswith('/failsafe'):
|
||||
return (b'{a}', ZnodeStat(0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0))
|
||||
return (b'', ZnodeStat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
|
||||
|
||||
Reference in New Issue
Block a user