mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Rename Postgresql.is_leader() to is_primary() (#2809)
It'll help to avoid confusion with the Ha.is_leader() method.
This commit is contained in:
committed by
GitHub
parent
f24db395c6
commit
efaba9f183
@@ -572,7 +572,7 @@ class Ha(object):
|
||||
if refresh:
|
||||
self.load_cluster_from_dcs()
|
||||
|
||||
is_leader = self.state_handler.is_leader()
|
||||
is_leader = self.state_handler.is_primary()
|
||||
|
||||
node_to_follow = self._get_node_to_follow(self.cluster)
|
||||
|
||||
@@ -751,7 +751,7 @@ class Ha(object):
|
||||
"""
|
||||
if not self.is_paused():
|
||||
if not self.watchdog.is_running and not self.watchdog.activate():
|
||||
if self.state_handler.is_leader():
|
||||
if self.state_handler.is_primary():
|
||||
self.demote('immediate')
|
||||
return 'Demoting self because watchdog could not be activated'
|
||||
else:
|
||||
@@ -767,7 +767,7 @@ class Ha(object):
|
||||
self._async_response.reset()
|
||||
return 'Promotion cancelled because the pre-promote script failed'
|
||||
|
||||
if self.state_handler.is_leader():
|
||||
if self.state_handler.is_primary():
|
||||
# Inform the state handler about its primary role.
|
||||
# It may be unaware of it if postgres is promoted manually.
|
||||
self.state_handler.set_role('master')
|
||||
@@ -959,7 +959,7 @@ class Ha(object):
|
||||
# Remove failover key if the node to failover has terminated to avoid waiting for it indefinitely
|
||||
# In order to avoid attempts to delete this key from all nodes only the primary is allowed to do it.
|
||||
if not self.cluster.get_member(failover.candidate, fallback_to_leader=False)\
|
||||
and self.state_handler.is_leader():
|
||||
and self.state_handler.is_primary():
|
||||
logger.warning("manual failover: removing failover key because failover candidate is not running")
|
||||
self.dcs.manual_failover('', '', version=failover.version)
|
||||
return None
|
||||
@@ -1017,7 +1017,7 @@ class Ha(object):
|
||||
if ret is not None: # continue if we just deleted the stale failover key as a leader
|
||||
return ret
|
||||
|
||||
if self.state_handler.is_leader():
|
||||
if self.state_handler.is_primary():
|
||||
if self.is_paused():
|
||||
# in pause leader is the healthiest only when no initialize or sysid matches with initialize!
|
||||
return not self.cluster.initialize or self.state_handler.sysid == self.cluster.initialize
|
||||
@@ -1206,7 +1206,7 @@ class Ha(object):
|
||||
|
||||
:returns: action message if demote was initiated, None if no action was taken"""
|
||||
failover = self.cluster.failover
|
||||
if not failover or (self.is_paused() and not self.state_handler.is_leader()):
|
||||
if not failover or (self.is_paused() and not self.state_handler.is_primary()):
|
||||
return
|
||||
|
||||
if (failover.scheduled_at and not
|
||||
@@ -1276,7 +1276,7 @@ class Ha(object):
|
||||
|
||||
def process_healthy_cluster(self) -> str:
|
||||
if self.has_lock():
|
||||
if self.is_paused() and not self.state_handler.is_leader():
|
||||
if self.is_paused() and not self.state_handler.is_primary():
|
||||
if self.cluster.failover and self.cluster.failover.candidate == self.state_handler.name:
|
||||
return 'waiting to become primary after promote...'
|
||||
|
||||
@@ -1308,7 +1308,7 @@ class Ha(object):
|
||||
else:
|
||||
# Either there is no connection to DCS or someone else acquired the lock
|
||||
logger.error('failed to update leader lock')
|
||||
if self.state_handler.is_leader():
|
||||
if self.state_handler.is_primary():
|
||||
if self.is_paused():
|
||||
return 'continue to run as primary after failing to update leader lock in DCS'
|
||||
self.demote('immediate-nolock')
|
||||
@@ -1547,7 +1547,7 @@ class Ha(object):
|
||||
self.cancel_initialization()
|
||||
|
||||
if result is None:
|
||||
if not self.state_handler.is_leader():
|
||||
if not self.state_handler.is_primary():
|
||||
return 'waiting for end of recovery after bootstrap'
|
||||
|
||||
self.state_handler.set_role('master')
|
||||
@@ -1731,7 +1731,7 @@ class Ha(object):
|
||||
elif self.cluster.is_unlocked() and not self.is_paused():
|
||||
# "bootstrap", but data directory is not empty
|
||||
if not self.state_handler.cb_called and self.state_handler.is_running() \
|
||||
and not self.state_handler.is_leader():
|
||||
and not self.state_handler.is_primary():
|
||||
self._join_aborted = True
|
||||
logger.error('No initialize key in DCS and PostgreSQL is running as replica, aborting start')
|
||||
logger.error('Please first start Patroni on the node running as primary')
|
||||
@@ -1774,7 +1774,7 @@ class Ha(object):
|
||||
create_slots = self._sync_replication_slots(False)
|
||||
|
||||
if not self.state_handler.cb_called:
|
||||
if not is_promoting and not self.state_handler.is_leader():
|
||||
if not is_promoting and not self.state_handler.is_primary():
|
||||
self._rewind.trigger_check_diverged_lsn()
|
||||
self.state_handler.call_nowait(CallbackAction.ON_START)
|
||||
|
||||
@@ -1799,7 +1799,7 @@ class Ha(object):
|
||||
|
||||
def _handle_dcs_error(self) -> str:
|
||||
if not self.is_paused() and self.state_handler.is_running():
|
||||
if self.state_handler.is_leader():
|
||||
if self.state_handler.is_primary():
|
||||
if self.is_failsafe_mode() and self.check_failsafe_topology():
|
||||
self.set_is_leader(True)
|
||||
self._failsafe.set_is_active(time.time())
|
||||
|
||||
@@ -120,9 +120,9 @@ class Postgresql(object):
|
||||
|
||||
if self.is_running(): # we are "joining" already running postgres
|
||||
self.set_state('running')
|
||||
self.set_role('master' if self.is_leader() else 'replica')
|
||||
self.set_role('master' if self.is_primary() else 'replica')
|
||||
# postpone writing postgresql.conf for 12+ because recovery parameters are not yet known
|
||||
if self.major_version < 120000 or self.is_leader():
|
||||
if self.major_version < 120000 or self.is_primary():
|
||||
self.config.write_postgresql_conf()
|
||||
hba_saved = self.config.replace_pg_hba()
|
||||
ident_saved = self.config.replace_pg_ident()
|
||||
@@ -474,14 +474,14 @@ class Postgresql(object):
|
||||
""":returns: a result set of 'SELECT * FROM pg_stat_replication'."""
|
||||
return self._cluster_info_state_get('pg_stat_replication') or []
|
||||
|
||||
def replication_state_from_parameters(self, is_leader: bool, receiver_state: Optional[str],
|
||||
def replication_state_from_parameters(self, is_primary: bool, receiver_state: Optional[str],
|
||||
restore_command: Optional[str]) -> Optional[str]:
|
||||
"""Figure out the replication state from input parameters.
|
||||
|
||||
.. note::
|
||||
This method could be only called when Postgres is up, running and queries are successfuly executed.
|
||||
|
||||
:is_leader: `True` is postgres is not running in recovery
|
||||
:is_primary: `True` is postgres is not running in recovery
|
||||
:receiver_state: value from `pg_stat_get_wal_receiver.state` or None if Postgres is older than 9.6
|
||||
:restore_command: value of ``restore_command`` GUC for PostgreSQL 12+ or
|
||||
`postgresql.recovery_conf.restore_command` if it is set in Patroni configuration
|
||||
@@ -490,7 +490,7 @@ class Postgresql(object):
|
||||
- 'streaming' if replica is streaming according to the `pg_stat_wal_receiver` view;
|
||||
- 'in archive recovery' if replica isn't streaming and there is a `restore_command`
|
||||
"""
|
||||
if self._major_version >= 90600 and not is_leader:
|
||||
if self._major_version >= 90600 and not is_primary:
|
||||
if receiver_state == 'streaming':
|
||||
return 'streaming'
|
||||
# For Postgres older than 12 we get `restore_command` from Patroni config, otherwise we check GUC
|
||||
@@ -505,11 +505,11 @@ class Postgresql(object):
|
||||
|
||||
:returns: ``streaming``, ``in archive recovery``, or ``None``
|
||||
"""
|
||||
return self.replication_state_from_parameters(self.is_leader(),
|
||||
return self.replication_state_from_parameters(self.is_primary(),
|
||||
self._cluster_info_state_get('receiver_state'),
|
||||
self._cluster_info_state_get('restore_command'))
|
||||
|
||||
def is_leader(self) -> bool:
|
||||
def is_primary(self) -> bool:
|
||||
try:
|
||||
return bool(self._cluster_info_state_get('timeline'))
|
||||
except PostgresConnectionException:
|
||||
@@ -1157,9 +1157,9 @@ class Postgresql(object):
|
||||
return ret
|
||||
|
||||
@staticmethod
|
||||
def _wal_position(is_leader: bool, wal_position: int,
|
||||
def _wal_position(is_primary: bool, wal_position: int,
|
||||
received_location: Optional[int], replayed_location: Optional[int]) -> int:
|
||||
return wal_position if is_leader else max(received_location or 0, replayed_location or 0)
|
||||
return wal_position if is_primary else max(received_location or 0, replayed_location or 0)
|
||||
|
||||
def timeline_wal_position(self) -> Tuple[int, int, Optional[int]]:
|
||||
# This method could be called from different threads (simultaneously with some other `_query` calls).
|
||||
@@ -1195,7 +1195,7 @@ class Postgresql(object):
|
||||
return None
|
||||
|
||||
def last_operation(self) -> int:
|
||||
return self._wal_position(self.is_leader(), self._cluster_info_state_get('wal_position') or 0,
|
||||
return self._wal_position(self.is_primary(), self._cluster_info_state_get('wal_position') or 0,
|
||||
self.received_location(), self.replayed_location())
|
||||
|
||||
def configure_server_parameters(self) -> None:
|
||||
|
||||
@@ -412,7 +412,7 @@ class CitusHandler(Thread):
|
||||
parameters['wal_level'] = 'logical'
|
||||
|
||||
def ignore_replication_slot(self, slot: Dict[str, str]) -> bool:
|
||||
if isinstance(self._config, dict) and self._postgresql.is_leader() and\
|
||||
if isinstance(self._config, dict) and self._postgresql.is_primary() and\
|
||||
slot['type'] == 'logical' and slot['database'] == self._config['database']:
|
||||
m = CITUS_SLOT_NAME_RE.match(slot['name'])
|
||||
return bool(m and {'move': 'pgoutput', 'split': 'citus'}.get(m.group(1)) == slot['plugin'])
|
||||
|
||||
@@ -280,7 +280,7 @@ class Rewind(object):
|
||||
"""After promote issue a CHECKPOINT from a new thread and asynchronously check the result.
|
||||
In case if CHECKPOINT failed, just check that timeline in pg_control was updated."""
|
||||
|
||||
if self._state != REWIND_STATUS.CHECKPOINT and self._postgresql.is_leader():
|
||||
if self._state != REWIND_STATUS.CHECKPOINT and self._postgresql.is_primary():
|
||||
with self._checkpoint_task_lock:
|
||||
if self._checkpoint_task:
|
||||
with self._checkpoint_task:
|
||||
|
||||
@@ -508,7 +508,7 @@ class SlotsHandler:
|
||||
|
||||
self._ensure_physical_slots(slots)
|
||||
|
||||
if self._postgresql.is_leader():
|
||||
if self._postgresql.is_primary():
|
||||
self._logical_slots_processing_queue.clear()
|
||||
self._ensure_logical_slots_primary(slots)
|
||||
elif cluster.slots and slots:
|
||||
|
||||
@@ -338,7 +338,7 @@ END;$$""")
|
||||
sync_param = next(iter(sync), None)
|
||||
|
||||
if not (self._postgresql.config.set_synchronous_standby_names(sync_param)
|
||||
and self._postgresql.state == 'running' and self._postgresql.is_leader()) or has_asterisk:
|
||||
and self._postgresql.state == 'running' and self._postgresql.is_primary()) or has_asterisk:
|
||||
return
|
||||
|
||||
time.sleep(0.1) # Usualy it takes 1ms to reload postgresql.conf, but we will give it 100ms
|
||||
|
||||
@@ -162,7 +162,7 @@ def run_async(self, func, args=()):
|
||||
|
||||
|
||||
@patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster()))
|
||||
@patch.object(Postgresql, 'is_leader', Mock(return_value=True))
|
||||
@patch.object(Postgresql, 'is_primary', Mock(return_value=True))
|
||||
@patch.object(Postgresql, 'timeline_wal_position', Mock(return_value=(1, 10, 1)))
|
||||
@patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value=10))
|
||||
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))
|
||||
@@ -224,7 +224,7 @@ class TestHa(PostgresInit):
|
||||
@patch.object(Postgresql, 'received_timeline', Mock(return_value=None))
|
||||
def test_touch_member(self):
|
||||
self.p._major_version = 110000
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.timeline_wal_position = Mock(return_value=(0, 1, 0))
|
||||
self.p.replica_cached_timeline = Mock(side_effect=Exception)
|
||||
with patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value='streaming')):
|
||||
@@ -320,7 +320,7 @@ class TestHa(PostgresInit):
|
||||
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
||||
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
||||
def test_crash_recovery_before_rewind(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.is_running = false
|
||||
self.p.controldata = lambda: {'Database cluster state': 'in archive recovery',
|
||||
'Database system identifier': SYSID}
|
||||
@@ -365,7 +365,7 @@ class TestHa(PostgresInit):
|
||||
|
||||
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
||||
def test_start_as_readonly(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.is_healthy = true
|
||||
self.ha.has_lock = true
|
||||
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
|
||||
@@ -383,11 +383,11 @@ class TestHa(PostgresInit):
|
||||
|
||||
def test_promoted_by_acquiring_lock(self):
|
||||
self.ha.is_healthiest_node = true
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
||||
|
||||
def test_promotion_cancelled_after_pre_promote_failed(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p._pre_promote = false
|
||||
self.ha._is_healthiest_node = true
|
||||
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
||||
@@ -402,7 +402,7 @@ class TestHa(PostgresInit):
|
||||
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
||||
def test_long_promote(self):
|
||||
self.ha.has_lock = true
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.set_role('primary')
|
||||
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
||||
|
||||
@@ -413,7 +413,7 @@ class TestHa(PostgresInit):
|
||||
def test_follow_new_leader_after_failing_to_obtain_lock(self):
|
||||
self.ha.is_healthiest_node = true
|
||||
self.ha.acquire_lock = false
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'following new leader after trying and failing to obtain lock')
|
||||
|
||||
def test_demote_because_not_healthiest(self):
|
||||
@@ -422,21 +422,20 @@ class TestHa(PostgresInit):
|
||||
|
||||
def test_follow_new_leader_because_not_healthiest(self):
|
||||
self.ha.is_healthiest_node = false
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
||||
|
||||
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
||||
def test_promote_because_have_lock(self):
|
||||
self.ha.has_lock = true
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
|
||||
|
||||
def test_promote_without_watchdog(self):
|
||||
self.ha.has_lock = true
|
||||
self.p.is_leader = true
|
||||
with patch.object(Watchdog, 'activate', Mock(return_value=False)):
|
||||
self.assertEqual(self.ha.run_cycle(), 'Demoting self because watchdog could not be activated')
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'Not promoting self because watchdog could not be activated')
|
||||
|
||||
def test_leader_with_lock(self):
|
||||
@@ -462,12 +461,12 @@ class TestHa(PostgresInit):
|
||||
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
|
||||
with patch.object(Ha, '_get_node_to_follow', Mock(side_effect=DCSError('foo'))):
|
||||
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS')
|
||||
|
||||
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
||||
def test_follow(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
|
||||
self.ha.patroni.replicatefrom = "foo"
|
||||
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
|
||||
@@ -484,13 +483,13 @@ class TestHa(PostgresInit):
|
||||
def test_follow_in_pause(self):
|
||||
self.ha.is_paused = true
|
||||
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)')
|
||||
|
||||
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
||||
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
||||
def test_follow_triggers_rewind(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.ha._rewind.trigger_check_diverged_lsn()
|
||||
self.ha.cluster = get_cluster_initialized_with_leader()
|
||||
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
|
||||
@@ -544,7 +543,7 @@ class TestHa(PostgresInit):
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
|
||||
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible')
|
||||
|
||||
def test_no_dcs_connection_replica_failsafe_not_enabled_but_active(self):
|
||||
@@ -552,7 +551,7 @@ class TestHa(PostgresInit):
|
||||
self.ha.cluster = get_cluster_initialized_with_leader()
|
||||
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
|
||||
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible')
|
||||
|
||||
def test_update_failsafe(self):
|
||||
@@ -591,9 +590,9 @@ class TestHa(PostgresInit):
|
||||
self.ha.cluster = get_cluster_not_initialized_without_leader()
|
||||
self.e.initialize = true
|
||||
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap a new cluster')
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'waiting for end of recovery after bootstrap')
|
||||
self.p.is_leader = true
|
||||
self.p.is_primary = true
|
||||
self.ha.is_synchronous_mode = true
|
||||
self.assertEqual(self.ha.run_cycle(), 'running post_bootstrap')
|
||||
self.assertEqual(self.ha.run_cycle(), 'initialized a new cluster')
|
||||
@@ -613,7 +612,6 @@ class TestHa(PostgresInit):
|
||||
self.ha.cluster = get_cluster_not_initialized_without_leader()
|
||||
self.e.initialize = true
|
||||
self.ha.bootstrap()
|
||||
self.p.is_leader = true
|
||||
with patch.object(Watchdog, 'activate', Mock(return_value=False)), \
|
||||
patch('patroni.ha.logger.error') as mock_logger:
|
||||
self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap')
|
||||
@@ -745,7 +743,6 @@ class TestHa(PostgresInit):
|
||||
self.assertEqual('PAUSE: no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
|
||||
|
||||
def test_manual_failover_from_leader_in_synchronous_mode(self):
|
||||
self.p.is_leader = true
|
||||
self.ha.has_lock = true
|
||||
self.ha.is_synchronous_mode = true
|
||||
self.ha.process_sync_replication = Mock()
|
||||
@@ -756,7 +753,7 @@ class TestHa(PostgresInit):
|
||||
self.assertEqual('manual failover: demoting myself', self.ha.run_cycle())
|
||||
|
||||
def test_manual_failover_process_no_leader(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', self.p.name, None))
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'leader', None))
|
||||
self.p.set_role('replica')
|
||||
@@ -780,7 +777,7 @@ class TestHa(PostgresInit):
|
||||
|
||||
def test_manual_failover_process_no_leader_in_synchronous_mode(self):
|
||||
self.ha.is_synchronous_mode = true
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
|
||||
# switchover to a specific node, which name doesn't match our name (postgresql0)
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'other', None))
|
||||
@@ -840,14 +837,14 @@ class TestHa(PostgresInit):
|
||||
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'blabla', None))
|
||||
self.assertEqual('PAUSE: acquired session lock as a leader', self.ha.run_cycle())
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.set_role('replica')
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', self.p.name, None))
|
||||
self.assertEqual(self.ha.run_cycle(), 'PAUSE: promoted self to leader by acquiring session lock')
|
||||
|
||||
def test_is_healthiest_node(self):
|
||||
self.ha.is_failsafe_mode = true
|
||||
self.ha.state_handler.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.ha.patroni.nofailover = False
|
||||
self.ha.fetch_node_status = get_node_status()
|
||||
self.ha.dcs._last_failsafe = {'foo': ''}
|
||||
@@ -861,7 +858,7 @@ class TestHa(PostgresInit):
|
||||
self.assertFalse(self.ha.is_healthiest_node())
|
||||
|
||||
def test__is_healthiest_node(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name))
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
@@ -960,7 +957,7 @@ class TestHa(PostgresInit):
|
||||
self.assertTrue(self.ha.restart_matches("replica", "9.5.2", False))
|
||||
|
||||
def test_process_healthy_cluster_in_pause(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.ha.is_paused = true
|
||||
self.p.name = 'leader'
|
||||
self.ha.cluster = get_cluster_initialized_with_leader()
|
||||
@@ -971,7 +968,7 @@ class TestHa(PostgresInit):
|
||||
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
|
||||
@patch('builtins.open', mock_open(read_data='1\t0/40159C0\tno recovery target specified\n'))
|
||||
def test_process_healthy_standby_cluster_as_standby_leader(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.name = 'leader'
|
||||
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
||||
self.p.config.check_recovery_conf = Mock(return_value=(False, False))
|
||||
@@ -983,7 +980,7 @@ class TestHa(PostgresInit):
|
||||
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
|
||||
|
||||
def test_process_healthy_standby_cluster_as_cascade_replica(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.name = 'replica'
|
||||
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
||||
self.assertEqual(self.ha.run_cycle(),
|
||||
@@ -993,7 +990,7 @@ class TestHa(PostgresInit):
|
||||
|
||||
@patch.object(Cluster, 'is_unlocked', Mock(return_value=True))
|
||||
def test_process_unhealthy_standby_cluster_as_standby_leader(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.name = 'leader'
|
||||
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
||||
self.ha.sysid_valid = true
|
||||
@@ -1003,13 +1000,13 @@ class TestHa(PostgresInit):
|
||||
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
||||
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
||||
def test_process_unhealthy_standby_cluster_as_cascade_replica(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.name = 'replica'
|
||||
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
||||
self.assertTrue(self.ha.run_cycle().startswith('running pg_rewind from remote_member:'))
|
||||
|
||||
def test_recover_unhealthy_leader_in_standby_cluster(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.name = 'leader'
|
||||
self.p.is_running = false
|
||||
self.p.follow = false
|
||||
@@ -1018,7 +1015,7 @@ class TestHa(PostgresInit):
|
||||
|
||||
@patch.object(Cluster, 'is_unlocked', Mock(return_value=True))
|
||||
def test_recover_unhealthy_unlocked_standby_cluster(self):
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.name = 'leader'
|
||||
self.p.is_running = false
|
||||
self.p.follow = false
|
||||
@@ -1078,7 +1075,7 @@ class TestHa(PostgresInit):
|
||||
check_calls([(update_lock, True), (demote, True)])
|
||||
|
||||
self.ha.has_lock = false
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertEqual(self.ha.run_cycle(),
|
||||
'no action. I am (postgresql0), a secondary, and following a leader (leader)')
|
||||
check_calls([(update_lock, False), (demote, False)])
|
||||
@@ -1212,7 +1209,7 @@ class TestHa(PostgresInit):
|
||||
self.ha.is_synchronous_mode = true
|
||||
|
||||
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.set_role('replica')
|
||||
self.ha.has_lock = true
|
||||
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
||||
@@ -1235,7 +1232,7 @@ class TestHa(PostgresInit):
|
||||
def test_unhealthy_sync_mode(self):
|
||||
self.ha.is_synchronous_mode = true
|
||||
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.set_role('replica')
|
||||
self.p.name = 'other'
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other2'))
|
||||
@@ -1266,7 +1263,7 @@ class TestHa(PostgresInit):
|
||||
self.ha.is_synchronous_mode = true
|
||||
|
||||
self.p.name = 'other'
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.p.set_role('replica')
|
||||
mock_restart = self.p.restart = Mock(return_value=True)
|
||||
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
|
||||
@@ -1369,7 +1366,7 @@ class TestHa(PostgresInit):
|
||||
@patch('sys.exit', return_value=1)
|
||||
def test_abort_join(self, exit_mock):
|
||||
self.ha.cluster = get_cluster_not_initialized_without_leader()
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.ha.run_cycle()
|
||||
exit_mock.assert_called_once_with(1)
|
||||
|
||||
@@ -1429,7 +1426,7 @@ class TestHa(PostgresInit):
|
||||
@patch.object(SlotsHandler, 'sync_replication_slots', Mock(return_value=['ls']))
|
||||
def test_follow_copy(self):
|
||||
self.ha.cluster.config.data['slots'] = {'ls': {'database': 'a', 'plugin': 'b'}}
|
||||
self.p.is_leader = false
|
||||
self.p.is_primary = false
|
||||
self.assertTrue(self.ha.run_cycle().startswith('Copying logical slots'))
|
||||
|
||||
def test_acquire_lock(self):
|
||||
|
||||
@@ -363,11 +363,11 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
self.assertRaises(psycopg.ProgrammingError, self.p.query, 'blabla')
|
||||
|
||||
@patch.object(Postgresql, 'pg_isready', Mock(return_value=STATE_REJECT))
|
||||
def test_is_leader(self):
|
||||
self.assertTrue(self.p.is_leader())
|
||||
def test_is_primary(self):
|
||||
self.assertTrue(self.p.is_primary())
|
||||
self.p.reset_cluster_info_state(None)
|
||||
with patch.object(Postgresql, '_query', Mock(side_effect=RetryFailedError(''))):
|
||||
self.assertFalse(self.p.is_leader())
|
||||
self.assertFalse(self.p.is_primary())
|
||||
|
||||
@patch.object(Postgresql, 'controldata', Mock(return_value={'Database cluster state': 'shut down',
|
||||
'Latest checkpoint location': '0/1ADBC18',
|
||||
@@ -461,7 +461,7 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
self.assertIsNone(self.p.call_nowait(CallbackAction.ON_START))
|
||||
|
||||
@patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster()))
|
||||
def test_is_leader_exception(self):
|
||||
def test_is_primary_exception(self):
|
||||
self.p.start()
|
||||
self.p.query = Mock(side_effect=psycopg.OperationalError("not supported"))
|
||||
self.assertTrue(self.p.stop())
|
||||
|
||||
@@ -48,7 +48,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
self.s.sync_replication_slots(cluster, False)
|
||||
mock_debug.assert_called_once()
|
||||
self.p.set_role('replica')
|
||||
with patch.object(Postgresql, 'is_leader', Mock(return_value=False)), \
|
||||
with patch.object(Postgresql, 'is_primary', Mock(return_value=False)), \
|
||||
patch.object(SlotsHandler, 'drop_replication_slot') as mock_drop:
|
||||
self.s.sync_replication_slots(cluster, False, paused=True)
|
||||
mock_drop.assert_not_called()
|
||||
@@ -79,7 +79,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
None, SyncState.empty(), None, {'ls': 10}, None)
|
||||
self.p.set_role('replica')
|
||||
with patch.object(Postgresql, '_query') as mock_query, \
|
||||
patch.object(Postgresql, 'is_leader', Mock(return_value=False)):
|
||||
patch.object(Postgresql, 'is_primary', Mock(return_value=False)):
|
||||
mock_query.return_value = [('ls', 'logical', 'b', 'a', 5, 12345, 105)]
|
||||
ret = self.s.sync_replication_slots(cluster, False)
|
||||
self.assertEqual(ret, [])
|
||||
@@ -106,7 +106,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])
|
||||
self.assertEqual(self.p.slots(), {})
|
||||
|
||||
@patch.object(Postgresql, 'is_leader', Mock(return_value=False))
|
||||
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
|
||||
def test__ensure_logical_slots_replica(self):
|
||||
self.p.set_role('replica')
|
||||
self.cluster.slots['ls'] = 12346
|
||||
@@ -133,7 +133,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
|
||||
@patch.object(Postgresql, 'stop', Mock(return_value=True))
|
||||
@patch.object(Postgresql, 'start', Mock(return_value=True))
|
||||
@patch.object(Postgresql, 'is_leader', Mock(return_value=False))
|
||||
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
|
||||
def test_check_logical_slots_readiness(self):
|
||||
self.s.copy_logical_slots(self.cluster, ['ls'])
|
||||
with patch.object(MockCursor, '__iter__', Mock(return_value=iter([('postgresql0', None)]))), \
|
||||
@@ -147,7 +147,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
|
||||
@patch.object(Postgresql, 'stop', Mock(return_value=True))
|
||||
@patch.object(Postgresql, 'start', Mock(return_value=True))
|
||||
@patch.object(Postgresql, 'is_leader', Mock(return_value=False))
|
||||
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
|
||||
def test_on_promote(self):
|
||||
self.s.schedule_advance_slots({'foo': {'bar': 100}})
|
||||
self.s.copy_logical_slots(self.cluster, ['ls'])
|
||||
|
||||
Reference in New Issue
Block a user