diff --git a/patroni/ha.py b/patroni/ha.py index fae3535e..886dcc8e 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -572,7 +572,7 @@ class Ha(object): if refresh: self.load_cluster_from_dcs() - is_leader = self.state_handler.is_leader() + is_leader = self.state_handler.is_primary() node_to_follow = self._get_node_to_follow(self.cluster) @@ -751,7 +751,7 @@ class Ha(object): """ if not self.is_paused(): if not self.watchdog.is_running and not self.watchdog.activate(): - if self.state_handler.is_leader(): + if self.state_handler.is_primary(): self.demote('immediate') return 'Demoting self because watchdog could not be activated' else: @@ -767,7 +767,7 @@ class Ha(object): self._async_response.reset() return 'Promotion cancelled because the pre-promote script failed' - if self.state_handler.is_leader(): + if self.state_handler.is_primary(): # Inform the state handler about its primary role. # It may be unaware of it if postgres is promoted manually. self.state_handler.set_role('master') @@ -959,7 +959,7 @@ class Ha(object): # Remove failover key if the node to failover has terminated to avoid waiting for it indefinitely # In order to avoid attempts to delete this key from all nodes only the primary is allowed to do it. if not self.cluster.get_member(failover.candidate, fallback_to_leader=False)\ - and self.state_handler.is_leader(): + and self.state_handler.is_primary(): logger.warning("manual failover: removing failover key because failover candidate is not running") self.dcs.manual_failover('', '', version=failover.version) return None @@ -1017,7 +1017,7 @@ class Ha(object): if ret is not None: # continue if we just deleted the stale failover key as a leader return ret - if self.state_handler.is_leader(): + if self.state_handler.is_primary(): if self.is_paused(): # in pause leader is the healthiest only when no initialize or sysid matches with initialize! return not self.cluster.initialize or self.state_handler.sysid == self.cluster.initialize @@ -1206,7 +1206,7 @@ class Ha(object): :returns: action message if demote was initiated, None if no action was taken""" failover = self.cluster.failover - if not failover or (self.is_paused() and not self.state_handler.is_leader()): + if not failover or (self.is_paused() and not self.state_handler.is_primary()): return if (failover.scheduled_at and not @@ -1276,7 +1276,7 @@ class Ha(object): def process_healthy_cluster(self) -> str: if self.has_lock(): - if self.is_paused() and not self.state_handler.is_leader(): + if self.is_paused() and not self.state_handler.is_primary(): if self.cluster.failover and self.cluster.failover.candidate == self.state_handler.name: return 'waiting to become primary after promote...' @@ -1308,7 +1308,7 @@ class Ha(object): else: # Either there is no connection to DCS or someone else acquired the lock logger.error('failed to update leader lock') - if self.state_handler.is_leader(): + if self.state_handler.is_primary(): if self.is_paused(): return 'continue to run as primary after failing to update leader lock in DCS' self.demote('immediate-nolock') @@ -1547,7 +1547,7 @@ class Ha(object): self.cancel_initialization() if result is None: - if not self.state_handler.is_leader(): + if not self.state_handler.is_primary(): return 'waiting for end of recovery after bootstrap' self.state_handler.set_role('master') @@ -1731,7 +1731,7 @@ class Ha(object): elif self.cluster.is_unlocked() and not self.is_paused(): # "bootstrap", but data directory is not empty if not self.state_handler.cb_called and self.state_handler.is_running() \ - and not self.state_handler.is_leader(): + and not self.state_handler.is_primary(): self._join_aborted = True logger.error('No initialize key in DCS and PostgreSQL is running as replica, aborting start') logger.error('Please first start Patroni on the node running as primary') @@ -1774,7 +1774,7 @@ class Ha(object): create_slots = self._sync_replication_slots(False) if not self.state_handler.cb_called: - if not is_promoting and not self.state_handler.is_leader(): + if not is_promoting and not self.state_handler.is_primary(): self._rewind.trigger_check_diverged_lsn() self.state_handler.call_nowait(CallbackAction.ON_START) @@ -1799,7 +1799,7 @@ class Ha(object): def _handle_dcs_error(self) -> str: if not self.is_paused() and self.state_handler.is_running(): - if self.state_handler.is_leader(): + if self.state_handler.is_primary(): if self.is_failsafe_mode() and self.check_failsafe_topology(): self.set_is_leader(True) self._failsafe.set_is_active(time.time()) diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index e9844710..172e575e 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -120,9 +120,9 @@ class Postgresql(object): if self.is_running(): # we are "joining" already running postgres self.set_state('running') - self.set_role('master' if self.is_leader() else 'replica') + self.set_role('master' if self.is_primary() else 'replica') # postpone writing postgresql.conf for 12+ because recovery parameters are not yet known - if self.major_version < 120000 or self.is_leader(): + if self.major_version < 120000 or self.is_primary(): self.config.write_postgresql_conf() hba_saved = self.config.replace_pg_hba() ident_saved = self.config.replace_pg_ident() @@ -474,14 +474,14 @@ class Postgresql(object): """:returns: a result set of 'SELECT * FROM pg_stat_replication'.""" return self._cluster_info_state_get('pg_stat_replication') or [] - def replication_state_from_parameters(self, is_leader: bool, receiver_state: Optional[str], + def replication_state_from_parameters(self, is_primary: bool, receiver_state: Optional[str], restore_command: Optional[str]) -> Optional[str]: """Figure out the replication state from input parameters. .. note:: This method could be only called when Postgres is up, running and queries are successfuly executed. - :is_leader: `True` is postgres is not running in recovery + :is_primary: `True` is postgres is not running in recovery :receiver_state: value from `pg_stat_get_wal_receiver.state` or None if Postgres is older than 9.6 :restore_command: value of ``restore_command`` GUC for PostgreSQL 12+ or `postgresql.recovery_conf.restore_command` if it is set in Patroni configuration @@ -490,7 +490,7 @@ class Postgresql(object): - 'streaming' if replica is streaming according to the `pg_stat_wal_receiver` view; - 'in archive recovery' if replica isn't streaming and there is a `restore_command` """ - if self._major_version >= 90600 and not is_leader: + if self._major_version >= 90600 and not is_primary: if receiver_state == 'streaming': return 'streaming' # For Postgres older than 12 we get `restore_command` from Patroni config, otherwise we check GUC @@ -505,11 +505,11 @@ class Postgresql(object): :returns: ``streaming``, ``in archive recovery``, or ``None`` """ - return self.replication_state_from_parameters(self.is_leader(), + return self.replication_state_from_parameters(self.is_primary(), self._cluster_info_state_get('receiver_state'), self._cluster_info_state_get('restore_command')) - def is_leader(self) -> bool: + def is_primary(self) -> bool: try: return bool(self._cluster_info_state_get('timeline')) except PostgresConnectionException: @@ -1157,9 +1157,9 @@ class Postgresql(object): return ret @staticmethod - def _wal_position(is_leader: bool, wal_position: int, + def _wal_position(is_primary: bool, wal_position: int, received_location: Optional[int], replayed_location: Optional[int]) -> int: - return wal_position if is_leader else max(received_location or 0, replayed_location or 0) + return wal_position if is_primary else max(received_location or 0, replayed_location or 0) def timeline_wal_position(self) -> Tuple[int, int, Optional[int]]: # This method could be called from different threads (simultaneously with some other `_query` calls). @@ -1195,7 +1195,7 @@ class Postgresql(object): return None def last_operation(self) -> int: - return self._wal_position(self.is_leader(), self._cluster_info_state_get('wal_position') or 0, + return self._wal_position(self.is_primary(), self._cluster_info_state_get('wal_position') or 0, self.received_location(), self.replayed_location()) def configure_server_parameters(self) -> None: diff --git a/patroni/postgresql/citus.py b/patroni/postgresql/citus.py index e11c206c..b5c90cf5 100644 --- a/patroni/postgresql/citus.py +++ b/patroni/postgresql/citus.py @@ -412,7 +412,7 @@ class CitusHandler(Thread): parameters['wal_level'] = 'logical' def ignore_replication_slot(self, slot: Dict[str, str]) -> bool: - if isinstance(self._config, dict) and self._postgresql.is_leader() and\ + if isinstance(self._config, dict) and self._postgresql.is_primary() and\ slot['type'] == 'logical' and slot['database'] == self._config['database']: m = CITUS_SLOT_NAME_RE.match(slot['name']) return bool(m and {'move': 'pgoutput', 'split': 'citus'}.get(m.group(1)) == slot['plugin']) diff --git a/patroni/postgresql/rewind.py b/patroni/postgresql/rewind.py index 270d629c..73bccb44 100644 --- a/patroni/postgresql/rewind.py +++ b/patroni/postgresql/rewind.py @@ -280,7 +280,7 @@ class Rewind(object): """After promote issue a CHECKPOINT from a new thread and asynchronously check the result. In case if CHECKPOINT failed, just check that timeline in pg_control was updated.""" - if self._state != REWIND_STATUS.CHECKPOINT and self._postgresql.is_leader(): + if self._state != REWIND_STATUS.CHECKPOINT and self._postgresql.is_primary(): with self._checkpoint_task_lock: if self._checkpoint_task: with self._checkpoint_task: diff --git a/patroni/postgresql/slots.py b/patroni/postgresql/slots.py index 4abda4b1..4df27eaf 100644 --- a/patroni/postgresql/slots.py +++ b/patroni/postgresql/slots.py @@ -508,7 +508,7 @@ class SlotsHandler: self._ensure_physical_slots(slots) - if self._postgresql.is_leader(): + if self._postgresql.is_primary(): self._logical_slots_processing_queue.clear() self._ensure_logical_slots_primary(slots) elif cluster.slots and slots: diff --git a/patroni/postgresql/sync.py b/patroni/postgresql/sync.py index 2280a68c..3193a7e7 100644 --- a/patroni/postgresql/sync.py +++ b/patroni/postgresql/sync.py @@ -338,7 +338,7 @@ END;$$""") sync_param = next(iter(sync), None) if not (self._postgresql.config.set_synchronous_standby_names(sync_param) - and self._postgresql.state == 'running' and self._postgresql.is_leader()) or has_asterisk: + and self._postgresql.state == 'running' and self._postgresql.is_primary()) or has_asterisk: return time.sleep(0.1) # Usualy it takes 1ms to reload postgresql.conf, but we will give it 100ms diff --git a/tests/test_ha.py b/tests/test_ha.py index 6eb38204..f7378b43 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -162,7 +162,7 @@ def run_async(self, func, args=()): @patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster())) -@patch.object(Postgresql, 'is_leader', Mock(return_value=True)) +@patch.object(Postgresql, 'is_primary', Mock(return_value=True)) @patch.object(Postgresql, 'timeline_wal_position', Mock(return_value=(1, 10, 1))) @patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value=10)) @patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False)) @@ -224,7 +224,7 @@ class TestHa(PostgresInit): @patch.object(Postgresql, 'received_timeline', Mock(return_value=None)) def test_touch_member(self): self.p._major_version = 110000 - self.p.is_leader = false + self.p.is_primary = false self.p.timeline_wal_position = Mock(return_value=(0, 1, 0)) self.p.replica_cached_timeline = Mock(side_effect=Exception) with patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value='streaming')): @@ -320,7 +320,7 @@ class TestHa(PostgresInit): @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) @patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True)) def test_crash_recovery_before_rewind(self): - self.p.is_leader = false + self.p.is_primary = false self.p.is_running = false self.p.controldata = lambda: {'Database cluster state': 'in archive recovery', 'Database system identifier': SYSID} @@ -365,7 +365,7 @@ class TestHa(PostgresInit): @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_start_as_readonly(self): - self.p.is_leader = false + self.p.is_primary = false self.p.is_healthy = true self.ha.has_lock = true self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID} @@ -383,11 +383,11 @@ class TestHa(PostgresInit): def test_promoted_by_acquiring_lock(self): self.ha.is_healthiest_node = true - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') def test_promotion_cancelled_after_pre_promote_failed(self): - self.p.is_leader = false + self.p.is_primary = false self.p._pre_promote = false self.ha._is_healthiest_node = true self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') @@ -402,7 +402,7 @@ class TestHa(PostgresInit): @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_long_promote(self): self.ha.has_lock = true - self.p.is_leader = false + self.p.is_primary = false self.p.set_role('primary') self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') @@ -413,7 +413,7 @@ class TestHa(PostgresInit): def test_follow_new_leader_after_failing_to_obtain_lock(self): self.ha.is_healthiest_node = true self.ha.acquire_lock = false - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'following new leader after trying and failing to obtain lock') def test_demote_because_not_healthiest(self): @@ -422,21 +422,20 @@ class TestHa(PostgresInit): def test_follow_new_leader_because_not_healthiest(self): self.ha.is_healthiest_node = false - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_promote_because_have_lock(self): self.ha.has_lock = true - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock') def test_promote_without_watchdog(self): self.ha.has_lock = true - self.p.is_leader = true with patch.object(Watchdog, 'activate', Mock(return_value=False)): self.assertEqual(self.ha.run_cycle(), 'Demoting self because watchdog could not be activated') - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'Not promoting self because watchdog could not be activated') def test_leader_with_lock(self): @@ -462,12 +461,12 @@ class TestHa(PostgresInit): self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS') with patch.object(Ha, '_get_node_to_follow', Mock(side_effect=DCSError('foo'))): self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS') - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS') @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_follow(self): - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()') self.ha.patroni.replicatefrom = "foo" self.p.config.check_recovery_conf = Mock(return_value=(True, False)) @@ -484,13 +483,13 @@ class TestHa(PostgresInit): def test_follow_in_pause(self): self.ha.is_paused = true self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock') - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)') @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) @patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True)) def test_follow_triggers_rewind(self): - self.p.is_leader = false + self.p.is_primary = false self.ha._rewind.trigger_check_diverged_lsn() self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader') @@ -544,7 +543,7 @@ class TestHa(PostgresInit): self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster) self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni', 'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}}) - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible') def test_no_dcs_connection_replica_failsafe_not_enabled_but_active(self): @@ -552,7 +551,7 @@ class TestHa(PostgresInit): self.ha.cluster = get_cluster_initialized_with_leader() self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni', 'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}}) - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible') def test_update_failsafe(self): @@ -591,9 +590,9 @@ class TestHa(PostgresInit): self.ha.cluster = get_cluster_not_initialized_without_leader() self.e.initialize = true self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap a new cluster') - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'waiting for end of recovery after bootstrap') - self.p.is_leader = true + self.p.is_primary = true self.ha.is_synchronous_mode = true self.assertEqual(self.ha.run_cycle(), 'running post_bootstrap') self.assertEqual(self.ha.run_cycle(), 'initialized a new cluster') @@ -613,7 +612,6 @@ class TestHa(PostgresInit): self.ha.cluster = get_cluster_not_initialized_without_leader() self.e.initialize = true self.ha.bootstrap() - self.p.is_leader = true with patch.object(Watchdog, 'activate', Mock(return_value=False)), \ patch('patroni.ha.logger.error') as mock_logger: self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap') @@ -745,7 +743,6 @@ class TestHa(PostgresInit): self.assertEqual('PAUSE: no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle()) def test_manual_failover_from_leader_in_synchronous_mode(self): - self.p.is_leader = true self.ha.has_lock = true self.ha.is_synchronous_mode = true self.ha.process_sync_replication = Mock() @@ -756,7 +753,7 @@ class TestHa(PostgresInit): self.assertEqual('manual failover: demoting myself', self.ha.run_cycle()) def test_manual_failover_process_no_leader(self): - self.p.is_leader = false + self.p.is_primary = false self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', self.p.name, None)) self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'leader', None)) self.p.set_role('replica') @@ -780,7 +777,7 @@ class TestHa(PostgresInit): def test_manual_failover_process_no_leader_in_synchronous_mode(self): self.ha.is_synchronous_mode = true - self.p.is_leader = false + self.p.is_primary = false # switchover to a specific node, which name doesn't match our name (postgresql0) self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'other', None)) @@ -840,14 +837,14 @@ class TestHa(PostgresInit): self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock') self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'blabla', None)) self.assertEqual('PAUSE: acquired session lock as a leader', self.ha.run_cycle()) - self.p.is_leader = false + self.p.is_primary = false self.p.set_role('replica') self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', self.p.name, None)) self.assertEqual(self.ha.run_cycle(), 'PAUSE: promoted self to leader by acquiring session lock') def test_is_healthiest_node(self): self.ha.is_failsafe_mode = true - self.ha.state_handler.is_leader = false + self.p.is_primary = false self.ha.patroni.nofailover = False self.ha.fetch_node_status = get_node_status() self.ha.dcs._last_failsafe = {'foo': ''} @@ -861,7 +858,7 @@ class TestHa(PostgresInit): self.assertFalse(self.ha.is_healthiest_node()) def test__is_healthiest_node(self): - self.p.is_leader = false + self.p.is_primary = false self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name)) self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster) self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members)) @@ -960,7 +957,7 @@ class TestHa(PostgresInit): self.assertTrue(self.ha.restart_matches("replica", "9.5.2", False)) def test_process_healthy_cluster_in_pause(self): - self.p.is_leader = false + self.p.is_primary = false self.ha.is_paused = true self.p.name = 'leader' self.ha.cluster = get_cluster_initialized_with_leader() @@ -971,7 +968,7 @@ class TestHa(PostgresInit): @patch('patroni.postgresql.mtime', Mock(return_value=1588316884)) @patch('builtins.open', mock_open(read_data='1\t0/40159C0\tno recovery target specified\n')) def test_process_healthy_standby_cluster_as_standby_leader(self): - self.p.is_leader = false + self.p.is_primary = false self.p.name = 'leader' self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.p.config.check_recovery_conf = Mock(return_value=(False, False)) @@ -983,7 +980,7 @@ class TestHa(PostgresInit): self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock') def test_process_healthy_standby_cluster_as_cascade_replica(self): - self.p.is_leader = false + self.p.is_primary = false self.p.name = 'replica' self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.assertEqual(self.ha.run_cycle(), @@ -993,7 +990,7 @@ class TestHa(PostgresInit): @patch.object(Cluster, 'is_unlocked', Mock(return_value=True)) def test_process_unhealthy_standby_cluster_as_standby_leader(self): - self.p.is_leader = false + self.p.is_primary = false self.p.name = 'leader' self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.ha.sysid_valid = true @@ -1003,13 +1000,13 @@ class TestHa(PostgresInit): @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) @patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True)) def test_process_unhealthy_standby_cluster_as_cascade_replica(self): - self.p.is_leader = false + self.p.is_primary = false self.p.name = 'replica' self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.assertTrue(self.ha.run_cycle().startswith('running pg_rewind from remote_member:')) def test_recover_unhealthy_leader_in_standby_cluster(self): - self.p.is_leader = false + self.p.is_primary = false self.p.name = 'leader' self.p.is_running = false self.p.follow = false @@ -1018,7 +1015,7 @@ class TestHa(PostgresInit): @patch.object(Cluster, 'is_unlocked', Mock(return_value=True)) def test_recover_unhealthy_unlocked_standby_cluster(self): - self.p.is_leader = false + self.p.is_primary = false self.p.name = 'leader' self.p.is_running = false self.p.follow = false @@ -1078,7 +1075,7 @@ class TestHa(PostgresInit): check_calls([(update_lock, True), (demote, True)]) self.ha.has_lock = false - self.p.is_leader = false + self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader (leader)') check_calls([(update_lock, False), (demote, False)]) @@ -1212,7 +1209,7 @@ class TestHa(PostgresInit): self.ha.is_synchronous_mode = true mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock() - self.p.is_leader = false + self.p.is_primary = false self.p.set_role('replica') self.ha.has_lock = true mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) @@ -1235,7 +1232,7 @@ class TestHa(PostgresInit): def test_unhealthy_sync_mode(self): self.ha.is_synchronous_mode = true - self.p.is_leader = false + self.p.is_primary = false self.p.set_role('replica') self.p.name = 'other' self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other2')) @@ -1266,7 +1263,7 @@ class TestHa(PostgresInit): self.ha.is_synchronous_mode = true self.p.name = 'other' - self.p.is_leader = false + self.p.is_primary = false self.p.set_role('replica') mock_restart = self.p.restart = Mock(return_value=True) self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other')) @@ -1369,7 +1366,7 @@ class TestHa(PostgresInit): @patch('sys.exit', return_value=1) def test_abort_join(self, exit_mock): self.ha.cluster = get_cluster_not_initialized_without_leader() - self.p.is_leader = false + self.p.is_primary = false self.ha.run_cycle() exit_mock.assert_called_once_with(1) @@ -1429,7 +1426,7 @@ class TestHa(PostgresInit): @patch.object(SlotsHandler, 'sync_replication_slots', Mock(return_value=['ls'])) def test_follow_copy(self): self.ha.cluster.config.data['slots'] = {'ls': {'database': 'a', 'plugin': 'b'}} - self.p.is_leader = false + self.p.is_primary = false self.assertTrue(self.ha.run_cycle().startswith('Copying logical slots')) def test_acquire_lock(self): diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index a3475ef5..a703f235 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -363,11 +363,11 @@ class TestPostgresql(BaseTestPostgresql): self.assertRaises(psycopg.ProgrammingError, self.p.query, 'blabla') @patch.object(Postgresql, 'pg_isready', Mock(return_value=STATE_REJECT)) - def test_is_leader(self): - self.assertTrue(self.p.is_leader()) + def test_is_primary(self): + self.assertTrue(self.p.is_primary()) self.p.reset_cluster_info_state(None) with patch.object(Postgresql, '_query', Mock(side_effect=RetryFailedError(''))): - self.assertFalse(self.p.is_leader()) + self.assertFalse(self.p.is_primary()) @patch.object(Postgresql, 'controldata', Mock(return_value={'Database cluster state': 'shut down', 'Latest checkpoint location': '0/1ADBC18', @@ -461,7 +461,7 @@ class TestPostgresql(BaseTestPostgresql): self.assertIsNone(self.p.call_nowait(CallbackAction.ON_START)) @patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster())) - def test_is_leader_exception(self): + def test_is_primary_exception(self): self.p.start() self.p.query = Mock(side_effect=psycopg.OperationalError("not supported")) self.assertTrue(self.p.stop()) diff --git a/tests/test_slots.py b/tests/test_slots.py index a5be0d1d..0210d701 100644 --- a/tests/test_slots.py +++ b/tests/test_slots.py @@ -48,7 +48,7 @@ class TestSlotsHandler(BaseTestPostgresql): self.s.sync_replication_slots(cluster, False) mock_debug.assert_called_once() self.p.set_role('replica') - with patch.object(Postgresql, 'is_leader', Mock(return_value=False)), \ + with patch.object(Postgresql, 'is_primary', Mock(return_value=False)), \ patch.object(SlotsHandler, 'drop_replication_slot') as mock_drop: self.s.sync_replication_slots(cluster, False, paused=True) mock_drop.assert_not_called() @@ -79,7 +79,7 @@ class TestSlotsHandler(BaseTestPostgresql): None, SyncState.empty(), None, {'ls': 10}, None) self.p.set_role('replica') with patch.object(Postgresql, '_query') as mock_query, \ - patch.object(Postgresql, 'is_leader', Mock(return_value=False)): + patch.object(Postgresql, 'is_primary', Mock(return_value=False)): mock_query.return_value = [('ls', 'logical', 'b', 'a', 5, 12345, 105)] ret = self.s.sync_replication_slots(cluster, False) self.assertEqual(ret, []) @@ -106,7 +106,7 @@ class TestSlotsHandler(BaseTestPostgresql): "confirmed_flush_lsn": 12345, "catalog_xmin": 105}]) self.assertEqual(self.p.slots(), {}) - @patch.object(Postgresql, 'is_leader', Mock(return_value=False)) + @patch.object(Postgresql, 'is_primary', Mock(return_value=False)) def test__ensure_logical_slots_replica(self): self.p.set_role('replica') self.cluster.slots['ls'] = 12346 @@ -133,7 +133,7 @@ class TestSlotsHandler(BaseTestPostgresql): @patch.object(Postgresql, 'stop', Mock(return_value=True)) @patch.object(Postgresql, 'start', Mock(return_value=True)) - @patch.object(Postgresql, 'is_leader', Mock(return_value=False)) + @patch.object(Postgresql, 'is_primary', Mock(return_value=False)) def test_check_logical_slots_readiness(self): self.s.copy_logical_slots(self.cluster, ['ls']) with patch.object(MockCursor, '__iter__', Mock(return_value=iter([('postgresql0', None)]))), \ @@ -147,7 +147,7 @@ class TestSlotsHandler(BaseTestPostgresql): @patch.object(Postgresql, 'stop', Mock(return_value=True)) @patch.object(Postgresql, 'start', Mock(return_value=True)) - @patch.object(Postgresql, 'is_leader', Mock(return_value=False)) + @patch.object(Postgresql, 'is_primary', Mock(return_value=False)) def test_on_promote(self): self.s.schedule_advance_slots({'foo': {'bar': 100}}) self.s.copy_logical_slots(self.cluster, ['ls'])