Refactor replication slots handling (#2851)

1. make _get_members_slots() method return data in the same format as _get_permanent_slots() method
2. move conflicting name handling from get_replication_slots() to _get_members_slots() method
3. enrich structure returned by get_replication_slots() with the LSN of permanent logical slots reported by primary
4. use the added information in the SlotsHandler instead of fetching it from the Cluster.slots
5. bugfix: don't try to advance logical slot that doesn't match required configuration
This commit is contained in:
Alexander Kukushkin
2023-09-07 12:56:07 +02:00
committed by GitHub
parent 30f0f132e8
commit 19f20ec2eb
4 changed files with 52 additions and 44 deletions

View File

@@ -903,8 +903,16 @@ class Cluster(NamedTuple('Cluster',
@property
def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]:
"""Dictionary of permanent replication slots."""
return self.config and self.config.permanent_slots or {}
"""Dictionary of permanent replication slots with their known LSN."""
ret = deepcopy(self.config.permanent_slots if self.config else {})
# If primary reported flush LSN for permanent slots we want to enrich our structure with it
for name, lsn in (self.slots or {}).items():
if name in ret:
if not ret[name]:
ret[name] = {}
if isinstance(ret[name], dict):
ret[name]['lsn'] = lsn
return ret
@property
def __permanent_physical_slots(self) -> Dict[str, Any]:
@@ -929,7 +937,6 @@ class Cluster(NamedTuple('Cluster',
Will log an error if:
* Conflicting slot names between members are found
* Any logical slots are disabled, due to version compatibility, and *show_error* is ``True``.
:param my_name: name of this node.
@@ -942,21 +949,9 @@ class Cluster(NamedTuple('Cluster',
:returns: final dictionary of slot names, after merging with permanent slots and performing sanity checks.
"""
slot_members: List[str] = self._get_slot_members(my_name, role)
slots: Dict[str, Dict[str, str]] = {slot_name_from_member_name(name): {'type': 'physical'}
for name in slot_members}
if len(slots) < len(slot_members):
# Find which names are conflicting for a nicer error message
slot_conflicts: Dict[str, List[str]] = defaultdict(list)
for name in slot_members:
slot_conflicts[slot_name_from_member_name(name)].append(name)
logger.error("Following cluster members share a replication slot name: %s",
"; ".join(f"{', '.join(v)} map to {k}"
for k, v in slot_conflicts.items() if len(v) > 1))
slots: Dict[str, Dict[str, str]] = self._get_members_slots(my_name, role)
permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster, role, nofailover)
disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots(
slots, permanent_slots, my_name, major_version)
@@ -1016,7 +1011,7 @@ class Cluster(NamedTuple('Cluster',
return disabled_permanent_logical_slots
def _get_permanent_slots(self, is_standby_cluster: bool, role: str, nofailover: bool) -> Dict[str, Any]:
"""Get configured permanent slot names.
"""Get configured permanent replication slots.
.. note::
Permanent replication slots are only considered if ``use_slots`` configuration is enabled.
@@ -1042,35 +1037,48 @@ class Cluster(NamedTuple('Cluster',
return self.__permanent_slots if role in ('master', 'primary') else self.__permanent_logical_slots
def _get_slot_members(self, my_name: str, role: str) -> List[str]:
"""Get a list of member names that have replication slots sourcing from this node.
def _get_members_slots(self, my_name: str, role: str) -> Dict[str, Dict[str, str]]:
"""Get physical replication slots configuration for members that sourcing from this node.
If the ``replicatefrom`` tag is set on the member - we should not create the replication slot for it on
the current primary, because that member would replicate from elsewhere. We still create the slot if
the ``replicatefrom`` destination member is currently not a member of the cluster (fallback to the
primary), or if ``replicatefrom`` destination member happens to be the current primary.
Will log an error if:
* Conflicting slot names between members are found
:param my_name: name of this node.
:param role: role of this node, if this is a ``primary`` or ``standby_leader`` return list of members
replicating from this node. If not then return a list of members replicating as cascaded
replicas from this node.
:returns: list of member names.
:returns: dictionary of physical replication slots that should exist on a given node.
"""
if not self.use_slots:
return []
return {}
# we always want to exclude the member with our name from the list
members = filter(lambda m: m.name != my_name, self.members)
if role in ('master', 'primary', 'standby_leader'):
slot_members = [m.name for m in self.members
if m.name != my_name
and (m.replicatefrom is None
or m.replicatefrom == my_name
or not self.has_member(m.replicatefrom))]
members = [m for m in members if m.replicatefrom is None
or m.replicatefrom == my_name or not self.has_member(m.replicatefrom)]
else:
# only manage slots for replicas that replicate from this one, except for the leader among them
slot_members = [m.name for m in self.members
if m.replicatefrom == my_name and m.name != self.leader_name]
return slot_members
members = [m for m in members if m.replicatefrom == my_name and m.name != self.leader_name]
slots = {slot_name_from_member_name(m.name): {'type': 'physical'} for m in members}
if len(slots) < len(members):
# Find which names are conflicting for a nicer error message
slot_conflicts: Dict[str, List[str]] = defaultdict(list)
for member in members:
slot_conflicts[slot_name_from_member_name(member.name)].append(member.name)
logger.error("Following cluster members share a replication slot name: %s",
"; ".join(f"{', '.join(v)} map to {k}"
for k, v in slot_conflicts.items() if len(v) > 1))
return slots
def has_permanent_logical_slots(self, my_name: str, nofailover: bool, major_version: int = 110000) -> bool:
"""Check if the given member node has permanent ``logical`` replication slots configured.

View File

@@ -434,7 +434,7 @@ class SlotsHandler:
self._advance = SlotsAdvanceThread(self)
return self._advance.schedule(slots)
def _ensure_logical_slots_replica(self, cluster: Cluster, slots: Dict[str, Any]) -> List[str]:
def _ensure_logical_slots_replica(self, slots: Dict[str, Any]) -> List[str]:
"""Update logical *slots* on replicas.
If the logical slot already exists, copy state information into the replication slots structure stored in the
@@ -444,7 +444,6 @@ class SlotsHandler:
As logical slots can only be created when the primary is available, pass the list of slots that need to be
copied back to the caller. They will be created on replicas with :meth:`SlotsHandler.copy_logical_slots`.
:param cluster: object containing stateful information for the cluster.
:param slots: A dictionary mapping slot name to slot attributes. This method only considers a slot
if the value is a dictionary with the key ``type`` and a value of ``logical``.
@@ -459,15 +458,16 @@ class SlotsHandler:
continue
# If the logical already exists, copy some information about it into the original structure
if self._replication_slots.get(name, {}).get('datoid'):
if name in self._replication_slots and compare_slots(value, self._replication_slots[name]):
self._copy_items(self._replication_slots[name], value)
if cluster.slots and name in cluster.slots:
if 'lsn' in value: # The slot has feedback in DCS
try: # Skip slots that don't need to be advanced
if value['confirmed_flush_lsn'] < int(cluster.slots[name]):
advance_slots[value['database']][name] = int(cluster.slots[name])
if value['confirmed_flush_lsn'] < int(value['lsn']):
advance_slots[value['database']][name] = int(value['lsn'])
except Exception as e:
logger.error('Failed to parse "%s": %r', cluster.slots[name], e)
elif cluster.slots and name in cluster.slots: # We want to copy only slots with feedback in a DCS
logger.error('Failed to parse "%s": %r', value['lsn'], e)
elif name not in self._replication_slots and 'lsn' in value:
# We want to copy only slots with feedback in a DCS
create_slots.append(name)
# Slots to be copied from the primary should be removed from the *slots* structure,
@@ -512,10 +512,9 @@ class SlotsHandler:
if self._postgresql.is_primary():
self._logical_slots_processing_queue.clear()
self._ensure_logical_slots_primary(slots)
elif cluster.slots and slots:
else:
self.check_logical_slots_readiness(cluster, replicatefrom)
ret = self._ensure_logical_slots_replica(cluster, slots)
ret = self._ensure_logical_slots_replica(slots)
self._replication_slots = slots
except Exception:

View File

@@ -104,7 +104,7 @@ class MockCursor(object):
elif sql.startswith('SELECT slot_name, slot_type, datname, plugin, catalog_xmin'):
self.results = [('ls', 'logical', 'a', 'b', 100, 500, b'123456')]
elif sql.startswith('SELECT slot_name'):
self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'a', 'b', 5, 100, 500)]
self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'b', 'a', 5, 100, 500)]
elif sql.startswith('WITH slots AS (SELECT slot_name, active'):
self.results = [(False, True)] if self.rowcount == 1 else []
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):

View File

@@ -32,9 +32,9 @@ class TestSlotsHandler(BaseTestPostgresql):
self.p._global_config = GlobalConfig({})
self.s = self.p.slots_handler
self.p.start()
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}}}, 1)
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'ls2': None}}, 1)
self.cluster = Cluster(True, config, self.leader, 0, [self.me, self.other, self.leadermem],
None, SyncState.empty(), None, {'ls': 12345}, None)
None, SyncState.empty(), None, {'ls': 12345, 'ls2': 12345}, None)
def test_sync_replication_slots(self):
config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'},
@@ -123,6 +123,7 @@ class TestSlotsHandler(BaseTestPostgresql):
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), ['ls'])
self.cluster.slots['ls'] = 'a'
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), [])
self.cluster.config.data['slots']['ls']['database'] = 'b'
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), ['ls'])