import datetime import etcd import os import sys from mock import Mock, MagicMock, PropertyMock, patch, mock_open from patroni import global_config from patroni.collections import CaseInsensitiveSet from patroni.config import Config from patroni.dcs import Cluster, ClusterConfig, Failover, Leader, Member, get_dcs, Status, SyncState, TimelineHistory from patroni.dcs.etcd import AbstractEtcdClientWithFailover from patroni.exceptions import DCSError, PostgresConnectionException, PatroniFatalException from patroni.ha import Ha, _MemberStatus from patroni.postgresql import Postgresql from patroni.postgresql.bootstrap import Bootstrap from patroni.postgresql.cancellable import CancellableSubprocess from patroni.postgresql.config import ConfigHandler from patroni.postgresql.postmaster import PostmasterProcess from patroni.postgresql.rewind import Rewind from patroni.postgresql.slots import SlotsHandler from patroni.postgresql.sync import _SyncState from patroni.utils import tzutc from patroni.watchdog import Watchdog from . import PostgresInit, MockPostmaster, psycopg_connect, requests_get from .test_etcd import socket_getaddrinfo, etcd_read, etcd_write SYSID = '12345678901' def true(*args, **kwargs): return True def false(*args, **kwargs): return False def get_cluster(initialize, leader, members, failover, sync, cluster_config=None, failsafe=None): t = datetime.datetime.now().isoformat() history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '","foo"]]', [(1, 67197376, 'no recovery target specified', t, 'foo')]) cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True}, 1) return Cluster(initialize, cluster_config, leader, Status(10, None), members, failover, sync, history, failsafe) def get_cluster_not_initialized_without_leader(cluster_config=None): return get_cluster(None, None, [], None, SyncState.empty(), cluster_config) def get_cluster_bootstrapping_without_leader(cluster_config=None): return get_cluster("", None, [], None, SyncState.empty(), cluster_config) def get_cluster_initialized_without_leader(leader=False, failover=None, sync=None, cluster_config=None, failsafe=False): m1 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres', 'api_url': 'http://127.0.0.1:8008/patroni', 'xlog_location': 4, 'role': 'primary', 'state': 'running'}) leader = Leader(0, 0, m1 if leader else Member(0, '', 28, {})) m2 = Member(0, 'other', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'api_url': 'http://127.0.0.1:8011/patroni', 'state': 'running', 'pause': True, 'tags': {'clonefrom': True}, 'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00", 'postgres_version': '99.0.0'}}) syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1], 0) failsafe = {m.name: m.api_url for m in (m1, m2)} if failsafe else None return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config, failsafe) def get_cluster_initialized_with_leader(failover=None, sync=None): return get_cluster_initialized_without_leader(leader=True, failover=failover, sync=sync) def get_cluster_initialized_with_only_leader(failover=None, cluster_config=None): leader = get_cluster_initialized_without_leader(leader=True, failover=failover).leader return get_cluster(True, leader, [leader.member], failover, SyncState.empty(), cluster_config) def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None): return get_cluster_initialized_with_only_leader( cluster_config=ClusterConfig(1, { "standby_cluster": { "host": "localhost", "port": 5432, "primary_slot_name": "", }}, 1) ) def get_cluster_initialized_with_leader_and_failsafe(): return get_cluster_initialized_without_leader(leader=True, failsafe=True, cluster_config=ClusterConfig(1, {'failsafe_mode': True}, 1)) def get_node_status(reachable=True, in_recovery=True, dcs_last_seen=0, timeline=2, wal_position=10, nofailover=False, watchdog_failed=False, failover_priority=1): def fetch_node_status(e): tags = {} if nofailover: tags['nofailover'] = True tags['failover_priority'] = failover_priority return _MemberStatus(e, reachable, in_recovery, wal_position, {'tags': tags, 'watchdog_failed': watchdog_failed, 'dcs_last_seen': dcs_last_seen, 'timeline': timeline}) return fetch_node_status future_restart_time = datetime.datetime.now(tzutc) + datetime.timedelta(days=5) postmaster_start_time = datetime.datetime.now(tzutc) class MockPatroni(object): def __init__(self, p, d): os.environ[Config.PATRONI_CONFIG_VARIABLE] = """ restapi: listen: 0.0.0.0:8008 bootstrap: users: replicator: password: rep-pass options: - replication postgresql: name: foo data_dir: data/postgresql0 pg_rewind: username: postgres password: postgres watchdog: mode: off zookeeper: exhibitor: hosts: [localhost] port: 8181 """ # We rely on sys.argv in Config, so it's necessary to reset # all the extra values that are coming from py.test sys.argv = sys.argv[:1] self.config = Config(None) self.version = '1.5.7' self.postgresql = p self.dcs = d self.api = Mock() self.tags = {'foo': 'bar'} self.nofailover = None self.replicatefrom = None self.api.connection_string = 'http://127.0.0.1:8008' self.clonefrom = None self.nosync = False self.nostream = False self.scheduled_restart = {'schedule': future_restart_time, 'postmaster_start_time': str(postmaster_start_time)} self.watchdog = Watchdog(self.config) self.request = lambda *args, **kwargs: requests_get(args[0].api_url, *args[1:], **kwargs) self.failover_priority = 1 def run_async(self, func, args=()): self.reset_scheduled_action() if args: func(*args) else: func() @patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster())) @patch.object(Postgresql, 'is_primary', Mock(return_value=True)) @patch.object(Postgresql, 'timeline_wal_position', Mock(return_value=(1, 10, 1))) @patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value=10)) @patch.object(Postgresql, 'slots', Mock(return_value={'l': 100})) @patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False)) @patch.object(Postgresql, 'controldata', Mock(return_value={ 'Database system identifier': SYSID, 'Database cluster state': 'shut down', 'Latest checkpoint location': '0/12345678', "Latest checkpoint's TimeLineID": '2'})) @patch.object(SlotsHandler, 'load_replication_slots', Mock(side_effect=Exception)) @patch.object(ConfigHandler, 'append_pg_hba', Mock()) @patch.object(ConfigHandler, 'write_pgpass', Mock(return_value={})) @patch.object(ConfigHandler, 'write_recovery_conf', Mock()) @patch.object(ConfigHandler, 'write_postgresql_conf', Mock()) @patch.object(Postgresql, 'query', Mock()) @patch.object(Postgresql, 'checkpoint', Mock()) @patch.object(CancellableSubprocess, 'call', Mock(return_value=0)) @patch.object(Postgresql, 'get_replica_timeline', Mock(return_value=2)) @patch.object(Postgresql, 'get_primary_timeline', Mock(return_value=2)) @patch.object(Postgresql, 'get_major_version', Mock(return_value=140000)) @patch.object(Postgresql, 'resume_wal_replay', Mock()) @patch.object(ConfigHandler, 'restore_configuration_files', Mock()) @patch.object(etcd.Client, 'write', etcd_write) @patch.object(etcd.Client, 'read', etcd_read) @patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException)) @patch('patroni.postgresql.polling_loop', Mock(return_value=range(1))) @patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=False)) @patch('patroni.async_executor.AsyncExecutor.run_async', run_async) @patch('patroni.postgresql.rewind.Thread', Mock()) @patch('patroni.postgresql.mpp.citus.CitusHandler.start', Mock()) @patch('subprocess.call', Mock(return_value=0)) @patch('time.sleep', Mock()) class TestHa(PostgresInit): @patch('socket.getaddrinfo', socket_getaddrinfo) @patch('patroni.dcs.dcs_modules', Mock(return_value=['patroni.dcs.etcd'])) @patch.object(etcd.Client, 'read', etcd_read) @patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379'])) @patch.object(Config, '_load_cache', Mock()) def setUp(self): super(TestHa, self).setUp() self.p.set_state('running') self.p.set_role('replica') self.p.postmaster_start_time = MagicMock(return_value=str(postmaster_start_time)) self.p.can_create_replica_without_replication_connection = MagicMock(return_value=False) self.e = get_dcs({'etcd': {'ttl': 30, 'host': 'ok:2379', 'scope': 'test', 'name': 'foo', 'retry_timeout': 10}, 'citus': {'database': 'citus', 'group': None}}) self.ha = Ha(MockPatroni(self.p, self.e)) self.ha.old_cluster = self.e.get_cluster() self.ha.cluster = get_cluster_initialized_without_leader() global_config.update(self.ha.cluster) self.ha.load_cluster_from_dcs = Mock() def test_update_lock(self): self.ha.is_failsafe_mode = true self.p.last_operation = Mock(side_effect=PostgresConnectionException('')) self.ha.dcs.update_leader = Mock(side_effect=[DCSError(''), Exception]) self.assertRaises(DCSError, self.ha.update_lock) self.assertFalse(self.ha.update_lock(True)) @patch.object(Postgresql, 'received_timeline', Mock(return_value=None)) def test_touch_member(self): self.p._major_version = 110000 self.p.is_primary = false self.p.timeline_wal_position = Mock(return_value=(0, 1, 0)) self.p.replica_cached_timeline = Mock(side_effect=Exception) with patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value='streaming')): self.ha.touch_member() self.p.timeline_wal_position = Mock(return_value=(0, 1, 1)) self.p.set_role('standby_leader') self.ha.touch_member() self.p.set_role('primary') self.ha.dcs.touch_member = true self.ha.touch_member() def test_is_leader(self): self.assertFalse(self.ha.is_leader()) def test_start_as_replica(self): self.p.is_healthy = false self.assertEqual(self.ha.run_cycle(), 'starting as a secondary') @patch('patroni.dcs.etcd.Etcd.initialize', return_value=True) def test_bootstrap_as_standby_leader(self, initialize): self.p.data_directory_empty = true self.ha.cluster = get_cluster_not_initialized_without_leader( cluster_config=ClusterConfig(1, {"standby_cluster": {"port": 5432}}, 1)) self.assertEqual(self.ha.run_cycle(), 'trying to bootstrap a new standby leader') def test_bootstrap_waiting_for_standby_leader(self): self.p.data_directory_empty = true self.ha.cluster = get_cluster_initialized_without_leader() self.ha.cluster.config.data.update({'standby_cluster': {'port': 5432}}) self.assertEqual(self.ha.run_cycle(), 'waiting for standby_leader to bootstrap') @patch.object(Cluster, 'get_clone_member', Mock(return_value=Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni', 'conn_url': 'postgres://127.0.0.1:5432/postgres'}))) @patch.object(Bootstrap, 'create_replica', Mock(return_value=0)) def test_start_as_cascade_replica_in_standby_cluster(self): self.p.data_directory_empty = true self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.assertEqual(self.ha.run_cycle(), "trying to bootstrap from replica 'test'") def test_recover_replica_failed(self): self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID} self.p.is_running = false self.p.follow = false self.assertEqual(self.ha.run_cycle(), 'starting as a secondary') self.assertEqual(self.ha.run_cycle(), 'failed to start postgres') def test_recover_raft(self): self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID} self.p.is_running = false self.p.follow = true self.assertEqual(self.ha.run_cycle(), 'starting as a secondary') self.p.is_running = true ha_dcs_orig_name = self.ha.dcs.__class__.__name__ self.ha.dcs.__class__.__name__ = 'Raft' self.assertEqual(self.ha.run_cycle(), 'started as a secondary') self.ha.dcs.__class__.__name__ = ha_dcs_orig_name def test_recover_former_primary(self): self.p.follow = false self.p.is_running = false self.p.name = 'leader' self.p.set_role('demoted') self.p.controldata = lambda: {'Database cluster state': 'shut down', 'Database system identifier': SYSID} self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'starting as readonly because i had the session lock') def test_start_primary_after_failure(self): self.p.start = false self.p.is_running = false self.p.name = 'leader' self.p.set_role('primary') self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID} self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'starting primary after failure') @patch.object(Rewind, 'ensure_clean_shutdown', Mock()) def test_crash_recovery(self): self.ha.has_lock = true self.p.is_running = false self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID} self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode') with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)), \ patch.object(Ha, 'check_timeline', Mock(return_value=False)): self.ha._async_executor.schedule('doing crash recovery in a single user mode') self.ha.state_handler.cancellable._process = Mock() self.ha._crash_recovery_started -= 600 self.ha.cluster.config.data.update({'maximum_lag_on_failover': 10}) self.assertEqual(self.ha.run_cycle(), 'terminated crash recovery because of startup timeout') @patch.object(Rewind, 'ensure_clean_shutdown', Mock()) @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) @patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True)) def test_crash_recovery_before_rewind(self): self.p.is_primary = false self.p.is_running = false self.p.controldata = lambda: {'Database cluster state': 'in archive recovery', 'Database system identifier': SYSID} self.ha._rewind.trigger_check_diverged_lsn() self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode') @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) @patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True)) @patch('os.listdir', Mock(return_value=[])) @patch('patroni.postgresql.rewind.fsync_dir', Mock()) def test_recover_with_rewind(self): self.p.is_running = false self.ha.cluster = get_cluster_initialized_with_leader() self.ha.cluster.leader.member.data.update(version='2.0.2', role='primary') self.ha._rewind.pg_rewind = true self.ha._rewind.check_leader_is_not_in_recovery = true with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)): self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader') with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=False)), \ patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)): self.p.follow = true self.assertEqual(self.ha.run_cycle(), 'starting as a secondary') self.p.is_running = true self.ha.follow = Mock(return_value='fake') self.assertEqual(self.ha.run_cycle(), 'fake') @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) @patch.object(Rewind, 'should_remove_data_directory_on_diverged_timelines', PropertyMock(return_value=True)) @patch.object(Bootstrap, 'create_replica', Mock(return_value=1)) def test_recover_with_reinitialize(self): self.p.is_running = false self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'reinitializing due to diverged timelines') @patch('sys.exit', return_value=1) @patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True)) def test_sysid_no_match(self, exit_mock): self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'} self.ha.run_cycle() exit_mock.assert_called_once_with(1) @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_start_as_readonly(self): self.p.is_primary = false self.p.is_healthy = true self.ha.has_lock = true self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID} self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock') @patch('patroni.psycopg.connect', psycopg_connect) def test_acquire_lock_as_primary(self): self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader') def test_leader_race_stale_primary(self): with patch.object(Postgresql, 'get_primary_timeline', Mock(return_value=1)), \ patch('patroni.ha.logger.warning') as mock_logger: self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node') self.assertEqual(mock_logger.call_args[0][0], 'My timeline %s is behind last known cluster timeline %s') def test_promoted_by_acquiring_lock(self): self.ha.is_healthiest_node = true self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') def test_promotion_cancelled_after_pre_promote_failed(self): self.p.is_primary = false self.p._pre_promote = false self.ha._is_healthiest_node = true self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') self.assertEqual(self.ha.run_cycle(), 'Promotion cancelled because the pre-promote script failed') self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') def test_lost_leader_lock_during_promote(self): with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)): self.ha._async_executor.schedule('promote') self.assertEqual(self.ha.run_cycle(), 'lost leader before promote') @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_long_promote(self): self.ha.has_lock = true self.p.is_primary = false self.p.set_role('primary') self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') def test_demote_after_failing_to_obtain_lock(self): self.ha.acquire_lock = false self.assertEqual(self.ha.run_cycle(), 'demoted self after trying and failing to obtain lock') def test_follow_new_leader_after_failing_to_obtain_lock(self): self.ha.is_healthiest_node = true self.ha.acquire_lock = false self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'following new leader after trying and failing to obtain lock') def test_demote_because_not_healthiest(self): self.ha.is_healthiest_node = false self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node') def test_follow_new_leader_because_not_healthiest(self): self.ha.is_healthiest_node = false self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_promote_because_have_lock(self): self.ha.has_lock = true self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock') def test_promote_without_watchdog(self): self.ha.has_lock = true self.p.is_primary = true with patch.object(Watchdog, 'activate', Mock(return_value=False)): self.assertEqual(self.ha.run_cycle(), 'Demoting self because watchdog could not be activated') self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'Not promoting self because watchdog could not be activated') def test_leader_with_lock(self): self.ha.cluster = get_cluster_initialized_with_leader() self.ha.has_lock = true self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') def test_coordinator_leader_with_lock(self): self.ha.cluster = get_cluster_initialized_with_leader() self.ha.has_lock = true self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') @patch.object(Postgresql, '_wait_for_connection_close', Mock()) @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_demote_because_not_having_lock(self): with patch.object(Watchdog, 'is_running', PropertyMock(return_value=True)): self.assertEqual(self.ha.run_cycle(), 'demoting self because I do not have the lock and I was a leader') @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_demote_because_update_lock_failed(self): self.ha.has_lock = true self.ha.update_lock = false self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS') with patch.object(Ha, '_get_node_to_follow', Mock(side_effect=DCSError('foo'))): self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS') self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS') def test_get_node_to_follow_nostream(self): self.ha.patroni.nostream = True self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha._get_node_to_follow(self.ha.cluster), None) @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_follow(self): self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()') self.ha.patroni.replicatefrom = "foo" self.p.config.check_recovery_conf = Mock(return_value=(True, False)) self.ha.cluster.config.data.update({'slots': {'l': {'database': 'a', 'plugin': 'b'}}}) self.ha.cluster.members[1].data['tags']['replicatefrom'] = 'postgresql0' self.ha.patroni.nofailover = True self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()') del self.ha.cluster.config.data['slots'] self.ha.cluster.config.data.update({'postgresql': {'use_slots': False}}) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()') del self.ha.cluster.config.data['postgresql']['use_slots'] @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_follow_in_pause(self): self.ha.is_paused = true self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock') self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)') @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) @patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True)) def test_follow_triggers_rewind(self): self.p.is_primary = false self.ha._rewind.trigger_check_diverged_lsn() self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader') def test_no_dcs_connection_primary_demote(self): self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly')) self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader') self.ha._async_executor.schedule('dummy') self.assertEqual(self.ha.run_cycle(), 'demoted self because DCS is not accessible and I was a leader') def test_check_failsafe_topology(self): self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly')) self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe() global_config.update(self.ha.cluster) self.ha.dcs._last_failsafe = self.ha.cluster.failsafe self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader') self.ha.state_handler.name = self.ha.cluster.leader.name self.assertFalse(self.ha.failsafe_is_active()) self.assertEqual(self.ha.run_cycle(), 'continue to run as a leader because failsafe mode is enabled and all members are accessible') self.assertTrue(self.ha.failsafe_is_active()) with patch.object(Postgresql, 'slots', Mock(side_effect=Exception)): self.ha.patroni.request = Mock(side_effect=Exception) self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader') self.assertFalse(self.ha.failsafe_is_active()) self.ha.dcs._last_failsafe.clear() self.ha.dcs._last_failsafe[self.ha.cluster.leader.name] = self.ha.cluster.leader.member.api_url self.assertEqual(self.ha.run_cycle(), 'continue to run as a leader because failsafe mode is enabled and all members are accessible') def test_no_dcs_connection_primary_failsafe(self): self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly')) self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe() global_config.update(self.ha.cluster) self.ha.dcs._last_failsafe = self.ha.cluster.failsafe self.ha.state_handler.name = self.ha.cluster.leader.name self.assertEqual(self.ha.run_cycle(), 'continue to run as a leader because failsafe mode is enabled and all members are accessible') def test_readonly_dcs_primary_failsafe(self): self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe() self.ha.dcs.update_leader = Mock(side_effect=DCSError('Etcd is not responding properly')) self.ha.dcs._last_failsafe = self.ha.cluster.failsafe self.ha.state_handler.name = self.ha.cluster.leader.name self.assertEqual(self.ha.run_cycle(), 'continue to run as a leader because failsafe mode is enabled and all members are accessible') def test_no_dcs_connection_replica_failsafe(self): self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly')) self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe() global_config.update(self.ha.cluster) self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni', 'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}}) self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible') def test_no_dcs_connection_replica_failsafe_not_enabled_but_active(self): self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly')) self.ha.cluster = get_cluster_initialized_with_leader() self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni', 'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}}) self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible') def test_update_failsafe(self): self.assertRaises(Exception, self.ha.update_failsafe, {}) self.p.set_role('primary') self.assertEqual(self.ha.update_failsafe({}), 'Running as a leader') @patch('time.sleep', Mock()) def test_bootstrap_from_another_member(self): self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap from replica \'other\'') def test_bootstrap_waiting_for_leader(self): self.ha.cluster = get_cluster_initialized_without_leader() self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap') def test_bootstrap_without_leader(self): self.ha.cluster = get_cluster_initialized_without_leader() self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True) self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap (without leader)') def test_bootstrap_not_running_concurrently(self): self.ha.cluster = get_cluster_bootstrapping_without_leader() self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True) self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap') def test_bootstrap_initialize_lock_failed(self): self.ha.cluster = get_cluster_not_initialized_without_leader() self.assertEqual(self.ha.bootstrap(), 'failed to acquire initialize lock') @patch('patroni.psycopg.connect', psycopg_connect) @patch('patroni.postgresql.mpp.citus.connect', psycopg_connect) @patch('patroni.postgresql.mpp.citus.quote_ident', Mock()) @patch.object(Postgresql, 'connection', Mock(return_value=None)) def test_bootstrap_initialized_new_cluster(self): self.ha.cluster = get_cluster_not_initialized_without_leader() self.e.initialize = true self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap a new cluster') self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'waiting for end of recovery after bootstrap') self.p.is_primary = true self.ha.is_synchronous_mode = true self.assertEqual(self.ha.run_cycle(), 'running post_bootstrap') self.assertEqual(self.ha.run_cycle(), 'initialized a new cluster') def test_bootstrap_release_initialize_key_on_failure(self): self.ha.cluster = get_cluster_not_initialized_without_leader() self.e.initialize = true self.ha.bootstrap() self.p.is_running = false self.assertRaises(PatroniFatalException, self.ha.post_bootstrap) @patch('patroni.psycopg.connect', psycopg_connect) @patch('patroni.postgresql.mpp.citus.connect', psycopg_connect) @patch('patroni.postgresql.mpp.citus.quote_ident', Mock()) @patch.object(Postgresql, 'connection', Mock(return_value=None)) def test_bootstrap_release_initialize_key_on_watchdog_failure(self): self.ha.cluster = get_cluster_not_initialized_without_leader() self.e.initialize = true self.ha.bootstrap() self.p.is_primary = true with patch.object(Watchdog, 'activate', Mock(return_value=False)), \ patch('patroni.ha.logger.error') as mock_logger: self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap') self.assertRaises(PatroniFatalException, self.ha.post_bootstrap) self.assertTrue(mock_logger.call_args[0][0].startswith('Cancelling bootstrap because' ' watchdog activation failed')) @patch('patroni.psycopg.connect', psycopg_connect) def test_reinitialize(self): self.assertIsNotNone(self.ha.reinitialize()) self.ha.cluster = get_cluster_initialized_with_leader() self.assertIsNone(self.ha.reinitialize(True)) self.ha._async_executor.schedule('reinitialize') self.assertIsNotNone(self.ha.reinitialize()) self.ha.state_handler.name = self.ha.cluster.leader.name self.assertIsNotNone(self.ha.reinitialize()) @patch('time.sleep', Mock()) def test_restart(self): self.assertEqual(self.ha.restart({}), (True, 'restarted successfully')) self.p.restart = Mock(return_value=None) self.assertEqual(self.ha.restart({}), (False, 'postgres is still starting')) self.p.restart = false self.assertEqual(self.ha.restart({}), (False, 'restart failed')) self.ha.cluster = get_cluster_initialized_with_leader() self.ha._async_executor.schedule('reinitialize') self.assertEqual(self.ha.restart({}), (False, 'reinitialize already in progress')) with patch.object(self.ha, "restart_matches", return_value=False): self.assertEqual(self.ha.restart({'foo': 'bar'}), (False, "restart conditions are not satisfied")) @patch('time.sleep', Mock()) @patch.object(ConfigHandler, 'replace_pg_hba', Mock()) @patch.object(ConfigHandler, 'replace_pg_ident', Mock()) @patch.object(PostmasterProcess, 'start', Mock(return_value=MockPostmaster())) @patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False)) def test_worker_restart(self): self.ha.has_lock = true self.ha.patroni.request = Mock() self.p.is_running = Mock(side_effect=[Mock(), False]) self.assertEqual(self.ha.restart({}), (True, 'restarted successfully')) self.ha.patroni.request.assert_called() self.assertEqual(self.ha.patroni.request.call_args_list[0][0][3]['type'], 'before_demote') self.assertEqual(self.ha.patroni.request.call_args_list[1][0][3]['type'], 'after_promote') @patch('os.kill', Mock()) def test_restart_in_progress(self): with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)): self.ha._async_executor.schedule('restart') self.assertTrue(self.ha.restart_scheduled()) self.assertEqual(self.ha.run_cycle(), 'restart in progress') self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'restart in progress') self.ha.has_lock = true self.assertEqual(self.ha.run_cycle(), 'updated leader lock during restart') self.ha.update_lock = false self.p.set_role('primary') with patch('patroni.async_executor.CriticalTask.cancel', Mock(return_value=False)), \ patch('patroni.async_executor.CriticalTask.result', PropertyMock(return_value=PostmasterProcess(os.getpid())), create=True), \ patch('patroni.postgresql.Postgresql.terminate_starting_postmaster') as mock_terminate: self.assertEqual(self.ha.run_cycle(), 'lost leader lock during restart') mock_terminate.assert_called() self.ha.is_paused = true self.assertEqual(self.ha.run_cycle(), 'PAUSE: restart in progress') @patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False)) def test_manual_failover_from_leader(self): self.ha.has_lock = true # I am the leader # to me with patch('patroni.ha.logger.warning') as mock_warning: self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None)) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') mock_warning.assert_called_with('%s: I am already the leader, no need to %s', 'manual failover', 'failover') # to a non-existent candidate with patch('patroni.ha.logger.warning') as mock_warning: self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'blabla', None)) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') mock_warning.assert_called_with( '%s: no healthy members found, %s is not possible', 'manual failover', 'failover') # to an existent candidate self.ha.fetch_node_status = get_node_status() self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'b', None)) self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'})) self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself') # to a candidate on an older timeline with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(timeline=1) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') self.assertEqual(mock_info.call_args_list[0][0], ('Timeline %s of member %s is behind the cluster timeline %s', 1, 'b', 2)) # to a lagging candidate with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(wal_position=1) self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5}) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') self.assertEqual(mock_info.call_args_list[0][0], ('Member %s exceeds maximum replication lag', 'b')) self.ha.cluster.members.pop() @patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False)) def test_manual_switchover_from_leader(self): self.ha.has_lock = true # I am the leader self.ha.fetch_node_status = get_node_status() # different leader specified in failover key, no candidate with patch('patroni.ha.logger.warning') as mock_warning: self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', '', None)) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') mock_warning.assert_called_with( '%s: leader name does not match: %s != %s', 'switchover', 'blabla', 'postgresql0') # no candidate self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None)) self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself') self.ha._rewind.rewind_or_reinitialize_needed_and_possible = true self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself') # other members with failover_limitation_s with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(nofailover=True) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not allowed to promote')) with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(watchdog_failed=True) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not watchdog capable')) with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(timeline=1) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') self.assertEqual(mock_info.call_args_list[0][0], ('Timeline %s of member %s is behind the cluster timeline %s', 1, 'leader', 2)) with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(wal_position=1) self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5}) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') self.assertEqual(mock_info.call_args_list[0][0], ('Member %s exceeds maximum replication lag', 'leader')) @patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False)) def test_scheduled_switchover_from_leader(self): self.ha.has_lock = true # I am the leader self.ha.fetch_node_status = get_node_status() # switchover scheduled time must include timezone with patch('patroni.ha.logger.warning') as mock_warning: scheduled = datetime.datetime.now() self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'blabla', scheduled)) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') self.assertIn('Incorrect value of scheduled_at: %s', mock_warning.call_args_list[0][0]) # scheduled now scheduled = datetime.datetime.utcnow().replace(tzinfo=tzutc) self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'b', scheduled)) self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'})) self.assertEqual('switchover: demoting myself', self.ha.run_cycle()) # scheduled in the future with patch('patroni.ha.logger.info') as mock_info: scheduled = scheduled + datetime.timedelta(seconds=30) self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'blabla', scheduled)) self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle()) self.assertIn('Awaiting %s at %s (in %.0f seconds)', mock_info.call_args_list[0][0]) # stale value with patch('patroni.ha.logger.warning') as mock_warning: scheduled = scheduled + datetime.timedelta(seconds=-600) self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'b', scheduled)) self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'})) self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle()) self.assertIn('Found a stale %s value, cleaning up: %s', mock_warning.call_args_list[0][0]) def test_manual_switchover_from_leader_in_pause(self): self.ha.has_lock = true # I am the leader self.ha.is_paused = true # no candidate self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None)) with patch('patroni.ha.logger.warning') as mock_warning: self.assertEqual('PAUSE: no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle()) mock_warning.assert_called_with( '%s is possible only to a specific candidate in a paused state', 'Switchover') def test_manual_failover_from_leader_in_pause(self): self.ha.has_lock = true self.ha.fetch_node_status = get_node_status() self.ha.is_paused = true # failover from me, candidate is healthy self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, None, 'b', None)) self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'})) self.assertEqual('PAUSE: manual failover: demoting myself', self.ha.run_cycle()) self.ha.cluster.members.pop() def test_manual_failover_from_leader_in_synchronous_mode(self): self.ha.is_synchronous_mode = true self.ha.process_sync_replication = Mock() self.ha.fetch_node_status = get_node_status() # I am the leader self.p.is_primary = true self.ha.has_lock = true # the candidate is not in sync members but we allow failover to an async candidate self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, None, 'b', None), sync=(self.p.name, 'a')) self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'})) self.assertEqual('manual failover: demoting myself', self.ha.run_cycle()) self.ha.cluster.members.pop() def test_manual_switchover_from_leader_in_synchronous_mode(self): self.ha.is_synchronous_mode = true self.ha.process_sync_replication = Mock() # I am the leader self.p.is_primary = true self.ha.has_lock = true # candidate specified is not in sync members with patch('patroni.ha.logger.warning') as mock_warning: self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None), sync=(self.p.name, 'blabla')) self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle()) self.assertEqual(mock_warning.call_args_list[0][0], ('%s candidate=%s does not match with sync_standbys=%s', 'Switchover', 'a', 'blabla')) # the candidate is in sync members and is healthy self.ha.fetch_node_status = get_node_status(wal_position=305419896) self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None), sync=(self.p.name, 'a')) self.ha.cluster.members.append(Member(0, 'a', 28, {'api_url': 'http://127.0.0.1:8011/patroni'})) self.assertEqual('switchover: demoting myself', self.ha.run_cycle()) # the candidate is in sync members but is not healthy with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(nofailover=true) self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle()) self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'a', 'not allowed to promote')) def test_manual_failover_process_no_leader(self): self.p.is_primary = false self.p.set_role('replica') # failover to another member, fetch_node_status for candidate fails with patch('patroni.ha.logger.warning') as mock_warning: self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'leader', None)) self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') self.assertEqual(mock_warning.call_args_list[1][0], ('%s: member %s is %s', 'manual failover', 'leader', 'not reachable')) # failover to another member, candidate is accessible, in_recovery self.p.set_role('replica') self.ha.fetch_node_status = get_node_status() self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') # set nofailover flag to True for all members of the cluster # this should elect the current member, as we are not going to call the API for it. self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None)) self.ha.fetch_node_status = get_node_status(nofailover=True) self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') # failover to me but I am set to nofailover. In no case I should be elected as a leader self.p.set_role('replica') self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None)) self.ha.patroni.nofailover = True self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote') self.ha.patroni.nofailover = False # failover to another member that is on an older timeline (only failover_limitation() is checked) with patch('patroni.ha.logger.info') as mock_info: self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'b', None)) self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'})) self.ha.fetch_node_status = get_node_status(timeline=1) self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') mock_info.assert_called_with('%s: to %s, i am %s', 'manual failover', 'b', 'postgresql0') # failover to another member lagging behind the cluster_lsn (only failover_limitation() is checked) with patch('patroni.ha.logger.info') as mock_info: self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5}) self.ha.fetch_node_status = get_node_status(wal_position=1) self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') mock_info.assert_called_with('%s: to %s, i am %s', 'manual failover', 'b', 'postgresql0') def test_manual_switchover_process_no_leader(self): self.p.is_primary = false self.p.set_role('replica') # I was the leader, other members are healthy self.ha.fetch_node_status = get_node_status() self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, self.p.name, '', None)) self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') # I was the leader, I am the only healthy member with patch('patroni.ha.logger.info') as mock_info: self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not reachable')) self.assertEqual(mock_info.call_args_list[1][0], ('Member %s is %s', 'other', 'not reachable')) def test_manual_failover_process_no_leader_in_synchronous_mode(self): self.ha.is_synchronous_mode = true self.p.is_primary = false self.ha.fetch_node_status = get_node_status(nofailover=True) # other nodes are not healthy # manual failover when our name (postgresql0) isn't in the /sync key and the candidate node is not available self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None), sync=('leader1', 'blabla')) self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') # manual failover when the candidate node isn't available but our name is in the /sync key # while other sync node is nofailover with patch('patroni.ha.logger.warning') as mock_warning: self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None), sync=('leader1', 'postgresql0')) self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet())) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') self.assertEqual(mock_warning.call_args_list[0][0], ('%s: member %s is %s', 'manual failover', 'other', 'not allowed to promote')) # manual failover to our node (postgresql0), # which name is not in sync nodes list (some sync nodes are available) self.p.set_role('replica') self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None), sync=('leader1', 'other')) self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['leader1']), CaseInsensitiveSet(['leader1']))) self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') def test_manual_switchover_process_no_leader_in_synchronous_mode(self): self.ha.is_synchronous_mode = true self.p.is_primary = false # to a specific node, which name doesn't match our name (postgresql0) self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'other', None)) self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') # to our node (postgresql0), which name is not in sync nodes list self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'postgresql0', None), sync=('leader1', 'blabla')) self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') # without candidate, our name (postgresql0) is not in the sync nodes list self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None), sync=('leader', 'blabla')) self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node') # switchover from a specific leader, but the only sync node (us, postgresql0) has nofailover tag self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None), sync=('postgresql0')) self.ha.patroni.nofailover = True self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote') def test_manual_failover_process_no_leader_in_pause(self): self.ha.is_paused = true # I am running as primary, cluster is unlocked, the candidate is allowed to promote # but we are in pause self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None)) self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock') def test_manual_switchover_process_no_leader_in_pause(self): self.ha.is_paused = true # I am running as primary, cluster is unlocked, no candidate specified self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None)) self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock') # the candidate is not running with patch('patroni.ha.logger.warning') as mock_warning: self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'blabla', None)) self.assertEqual('PAUSE: acquired session lock as a leader', self.ha.run_cycle()) self.assertEqual( mock_warning.call_args_list[0][0], ('%s: removing failover key because failover candidate is not running', 'switchover')) # switchover to me, I am not leader self.p.is_primary = false self.p.set_role('replica') self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', self.p.name, None)) self.assertEqual(self.ha.run_cycle(), 'PAUSE: promoted self to leader by acquiring session lock') def test_is_healthiest_node(self): self.ha.is_failsafe_mode = true self.ha.state_handler.is_primary = false self.ha.patroni.nofailover = False self.ha.fetch_node_status = get_node_status() self.ha.dcs._last_failsafe = {'foo': ''} self.assertFalse(self.ha.is_healthiest_node()) self.ha.dcs._last_failsafe = {'postgresql0': ''} self.assertTrue(self.ha.is_healthiest_node()) self.ha.dcs._last_failsafe = None with patch.object(Watchdog, 'is_healthy', PropertyMock(return_value=False)): self.assertFalse(self.ha.is_healthiest_node()) self.ha.is_paused = true self.assertFalse(self.ha.is_healthiest_node()) def test__is_healthiest_node(self): self.p.is_primary = false self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name)) global_config.update(self.ha.cluster) self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.fetch_node_status = get_node_status() # accessible, in_recovery self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.fetch_node_status = get_node_status(in_recovery=False) # accessible, not in_recovery self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.fetch_node_status = get_node_status(failover_priority=2) # accessible, in_recovery, higher priority self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) # if there is a higher-priority node but it has a lower WAL position then this node should race self.ha.fetch_node_status = get_node_status(failover_priority=6, wal_position=9) self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.fetch_node_status = get_node_status(wal_position=11) # accessible, in_recovery, wal position ahead self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) # in synchronous_mode consider itself healthy if the former leader is accessible in read-only and ahead of us with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)): self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5}) global_config.update(self.ha.cluster) with patch('patroni.postgresql.Postgresql.last_operation', return_value=1): self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=None): self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=1): self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.patroni.nofailover = True self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.patroni.nofailover = None self.ha.patroni.failover_priority = 0 self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) def test_fetch_node_status(self): member = Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni'}) self.ha.fetch_node_status(member) member = Member(0, 'test', 1, {'api_url': 'http://localhost:8011/patroni'}) self.ha.patroni.request = Mock() self.ha.patroni.request.return_value.data = b'{"wal":{"location":1},"role":"primary"}' ret = self.ha.fetch_node_status(member) self.assertFalse(ret.in_recovery) @patch.object(Rewind, 'pg_rewind', true) @patch.object(Rewind, 'check_leader_is_not_in_recovery', true) @patch('os.listdir', Mock(return_value=[])) @patch('patroni.postgresql.rewind.fsync_dir', Mock()) def test_post_recover(self): self.p.is_running = false self.ha.has_lock = true self.p.set_role('primary') self.assertEqual(self.ha.post_recover(), 'removed leader key after trying and failing to start postgres') self.ha.has_lock = false self.assertEqual(self.ha.post_recover(), 'failed to start postgres') leader = Leader(0, 0, Member(0, 'l', 2, {"version": "1.6", "conn_url": "postgres://a", "role": "primary"})) self.ha._rewind.execute(leader) self.p.is_running = true self.assertIsNone(self.ha.post_recover()) def test_schedule_future_restart(self): self.ha.patroni.scheduled_restart = {} # do the restart 2 times. The first one should succeed, the second one should fail self.assertTrue(self.ha.schedule_future_restart({'schedule': future_restart_time})) self.assertFalse(self.ha.schedule_future_restart({'schedule': future_restart_time})) def test_delete_future_restarts(self): self.ha.delete_future_restart() def test_evaluate_scheduled_restart(self): self.p.postmaster_start_time = Mock(return_value=str(postmaster_start_time)) # restart already in progress with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)): self.assertIsNone(self.ha.evaluate_scheduled_restart()) # restart while the postmaster has been already restarted, fails with patch.object(self.ha, 'future_restart_scheduled', Mock(return_value={'postmaster_start_time': str(postmaster_start_time - datetime.timedelta(days=1)), 'schedule': str(future_restart_time)})): self.assertIsNone(self.ha.evaluate_scheduled_restart()) with patch.object(self.ha, 'future_restart_scheduled', Mock(return_value={'postmaster_start_time': str(postmaster_start_time), 'schedule': str(future_restart_time)})): with patch.object(self.ha, 'should_run_scheduled_action', Mock(return_value=True)): # restart in the future, ok self.assertIsNotNone(self.ha.evaluate_scheduled_restart()) with patch.object(self.ha, 'restart', Mock(return_value=(False, "Test"))): # restart in the future, bit the actual restart failed self.assertIsNone(self.ha.evaluate_scheduled_restart()) def test_scheduled_restart(self): self.ha.cluster = get_cluster_initialized_with_leader() with patch.object(self.ha, "evaluate_scheduled_restart", Mock(return_value="restart scheduled")): self.assertEqual(self.ha.run_cycle(), "restart scheduled") def test_restart_matches(self): self.p._role = 'replica' self.p._connection.server_version = 90500 self.p._pending_restart = True self.assertFalse(self.ha.restart_matches("primary", "9.5.0", True)) self.assertFalse(self.ha.restart_matches("replica", "9.4.3", True)) self.p._pending_restart = False self.assertFalse(self.ha.restart_matches("replica", "9.5.2", True)) self.assertTrue(self.ha.restart_matches("replica", "9.5.2", False)) def test_process_healthy_cluster_in_pause(self): self.p.is_primary = false self.ha.is_paused = true self.p.name = 'leader' self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running as primary') self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None)) self.assertEqual(self.ha.run_cycle(), 'PAUSE: waiting to become primary after promote...') @patch('patroni.postgresql.mtime', Mock(return_value=1588316884)) @patch('builtins.open', mock_open(read_data='1\t0/40159C0\tno recovery target specified\n')) def test_process_healthy_standby_cluster_as_standby_leader(self): self.p.is_primary = false self.p.name = 'leader' self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.p.config.check_recovery_conf = Mock(return_value=(False, False)) self.ha._leader_timeline = 1 self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock') self.assertEqual(self.ha.run_cycle(), 'no action. I am (leader), the standby leader with the lock') self.p.set_role('replica') self.p.config.check_recovery_conf = Mock(return_value=(True, False)) self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock') def test_process_healthy_standby_cluster_as_cascade_replica(self): self.p.is_primary = false self.p.name = 'replica' self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.assertEqual(self.ha.run_cycle(), 'no action. I am (replica), a secondary, and following a standby leader (leader)') with patch.object(Leader, 'conn_url', PropertyMock(return_value='')): self.assertEqual(self.ha.run_cycle(), 'continue following the old known standby leader') @patch.object(Cluster, 'is_unlocked', Mock(return_value=True)) def test_process_unhealthy_standby_cluster_as_standby_leader(self): self.p.is_primary = false self.p.name = 'leader' self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.ha.sysid_valid = true self.p._sysid = True self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader by acquiring session lock') @patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)) @patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True)) def test_process_unhealthy_standby_cluster_as_cascade_replica(self): self.p.is_primary = false self.p.name = 'replica' self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.assertTrue(self.ha.run_cycle().startswith('running pg_rewind from remote_member:')) def test_recover_unhealthy_leader_in_standby_cluster(self): self.p.is_primary = false self.p.name = 'leader' self.p.is_running = false self.p.follow = false self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.assertEqual(self.ha.run_cycle(), 'starting as a standby leader because i had the session lock') @patch.object(Cluster, 'is_unlocked', Mock(return_value=True)) def test_recover_unhealthy_unlocked_standby_cluster(self): self.p.is_primary = false self.p.name = 'leader' self.p.is_running = false self.p.follow = false self.ha.cluster = get_standby_cluster_initialized_with_only_leader() self.ha.has_lock = false self.assertEqual(self.ha.run_cycle(), 'trying to follow a remote member because standby cluster is unhealthy') def test_failed_to_update_lock_in_pause(self): self.ha.update_lock = false self.ha.is_paused = true self.p.name = 'leader' self.ha.cluster = get_cluster_initialized_with_leader() self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary after failing to update leader lock in DCS') def test_postgres_unhealthy_in_pause(self): self.ha.is_paused = true self.p.is_healthy = false self.assertEqual(self.ha.run_cycle(), 'PAUSE: postgres is not running') self.ha.has_lock = true self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running') def test_no_etcd_connection_in_pause(self): self.ha.is_paused = true self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly')) self.assertEqual(self.ha.run_cycle(), 'PAUSE: DCS is not accessible') @patch('patroni.ha.Ha.update_lock', return_value=True) @patch('patroni.ha.Ha.demote') def test_starting_timeout(self, demote, update_lock): def check_calls(seq): for mock, called in seq: if called: mock.assert_called_once() else: mock.assert_not_called() mock.reset_mock() self.ha.has_lock = true self.ha.cluster = get_cluster_initialized_with_leader() self.p.check_for_startup = true self.p.time_in_state = lambda: 30 self.assertEqual(self.ha.run_cycle(), 'PostgreSQL is still starting up, 270 seconds until timeout') check_calls([(update_lock, True), (demote, False)]) self.p.time_in_state = lambda: 350 self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery self.assertEqual(self.ha.run_cycle(), 'primary start has timed out, but continuing to wait because failover is not possible') check_calls([(update_lock, True), (demote, False)]) self.ha.fetch_node_status = get_node_status() # accessible, in_recovery self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL because of startup timeout') check_calls([(update_lock, True), (demote, True)]) update_lock.return_value = False self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL while starting up because leader key was lost') check_calls([(update_lock, True), (demote, True)]) self.ha.has_lock = false self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader (leader)') check_calls([(update_lock, False), (demote, False)]) def test_manual_failover_while_starting(self): self.ha.has_lock = true self.p.check_for_startup = true f = Failover(0, self.p.name, '', None) self.ha.cluster = get_cluster_initialized_with_leader(f) self.ha.fetch_node_status = get_node_status() # accessible, in_recovery self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself') @patch('patroni.ha.Ha.demote') def test_failover_immediately_on_zero_primary_start_timeout(self, demote): self.p.is_running = false self.ha.cluster = get_cluster_initialized_with_leader(sync=(self.p.name, 'other')) self.ha.cluster.config.data.update({'synchronous_mode': True, 'primary_start_timeout': 0}) self.ha.has_lock = true self.ha.update_lock = true self.ha.fetch_node_status = get_node_status() # accessible, in_recovery self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL to fail over after a crash') demote.assert_called_once() def test_primary_stop_timeout(self): self.assertEqual(self.ha.primary_stop_timeout(), None) self.ha.cluster.config.data.update({'primary_stop_timeout': 30}) global_config.update(self.ha.cluster) with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)): self.assertEqual(self.ha.primary_stop_timeout(), 30) with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=False)): self.assertEqual(self.ha.primary_stop_timeout(), None) self.ha.cluster.config.data['primary_stop_timeout'] = None global_config.update(self.ha.cluster) self.assertEqual(self.ha.primary_stop_timeout(), None) @patch('patroni.postgresql.Postgresql.follow') def test_demote_immediate(self, follow): self.ha.has_lock = true self.e.get_cluster = Mock(return_value=get_cluster_initialized_without_leader()) self.ha.demote('immediate') follow.assert_called_once_with(None) def test__process_multisync_replication(self): self.ha.has_lock = true mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock() self.p.name = 'leader' # Test sync key removed when sync mode disabled self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other')) with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync: self.ha.run_cycle() mock_delete_sync.assert_called_once() mock_set_sync.assert_called_once_with(CaseInsensitiveSet()) mock_set_sync.reset_mock() # Test sync key not touched when not there self.ha.cluster = get_cluster_initialized_with_leader() with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync: self.ha.run_cycle() mock_delete_sync.assert_not_called() mock_set_sync.assert_called_once_with(CaseInsensitiveSet()) mock_set_sync.reset_mock() self.ha.is_synchronous_mode = true # Test sync standby not touched when picking the same node self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1, CaseInsensitiveSet(['other']), CaseInsensitiveSet(['other']))) self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other')) self.ha.run_cycle() mock_set_sync.assert_not_called() mock_set_sync.reset_mock() # Test sync standby is replaced when switching standbys self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(), CaseInsensitiveSet(['other2']))) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.ha.run_cycle() mock_set_sync.assert_called_once_with(CaseInsensitiveSet(['other2'])) # Test sync standby is replaced when new standby is joined self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1, CaseInsensitiveSet(['other2']), CaseInsensitiveSet(['other2', 'other3']))) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.ha.run_cycle() self.assertEqual(mock_set_sync.call_args_list[0][0], (CaseInsensitiveSet(['other2']),)) self.assertEqual(mock_set_sync.call_args_list[1][0], (CaseInsensitiveSet(['other2', 'other3']),)) mock_set_sync.reset_mock() # Test sync standby is not disabled when updating dcs fails self.ha.dcs.write_sync_state = Mock(return_value=None) self.ha.run_cycle() mock_set_sync.assert_not_called() mock_set_sync.reset_mock() # Test changing sync standby self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other'))) # self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other')) self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1, CaseInsensitiveSet(['other2']), CaseInsensitiveSet(['other2']))) self.ha.run_cycle() self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2) # Test updating sync standby key failed due to race self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState.empty(), None]) self.ha.run_cycle() self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2) # Test updating sync standby key failed due to DCS being not accessible self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.ha.dcs.get_cluster = Mock(side_effect=DCSError('foo')) self.ha.run_cycle() # Test changing sync standby failed due to race self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('somebodyelse', None))) self.ha.run_cycle() self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2) # Test sync set to '*' when synchronous_mode_strict is enabled mock_set_sync.reset_mock() self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(), CaseInsensitiveSet())) with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)): self.ha.run_cycle() mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*')) def test_sync_replication_become_primary(self): self.ha.is_synchronous_mode = true mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock() self.p.is_primary = false self.p.set_role('replica') self.ha.has_lock = true mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) self.p.name = 'leader' self.ha.cluster = get_cluster_initialized_with_leader(sync=('other', None)) # When we just became primary nobody is sync self.assertEqual(self.ha.enforce_primary_role('msg', 'promote msg'), 'promote msg') mock_set_sync.assert_called_once_with(CaseInsensitiveSet(), 0) mock_write_sync.assert_called_once_with('leader', None, 0, version=0) mock_set_sync.reset_mock() # When we just became primary nobody is sync self.p.set_role('replica') mock_write_sync.return_value = False self.assertTrue(self.ha.enforce_primary_role('msg', 'promote msg') != 'promote msg') mock_set_sync.assert_not_called() def test_unhealthy_sync_mode(self): self.ha.is_synchronous_mode = true self.p.is_primary = false self.p.set_role('replica') self.p.name = 'other' self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other2')) mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) mock_acquire = self.ha.acquire_lock = Mock(return_value=True) mock_follow = self.p.follow = Mock() mock_promote = self.p.promote = Mock() # If we don't match the sync replica we are not allowed to acquire lock self.ha.run_cycle() mock_acquire.assert_not_called() mock_follow.assert_called_once() self.assertEqual(mock_follow.call_args[0][0], None) mock_write_sync.assert_not_called() mock_follow.reset_mock() # If we do match we will try to promote self.ha._is_healthiest_node = true self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other')) self.ha.run_cycle() mock_acquire.assert_called_once() mock_follow.assert_not_called() mock_promote.assert_called_once() mock_write_sync.assert_called_once_with('other', None, 0, version=0) def test_disable_sync_when_restarting(self): self.ha.is_synchronous_mode = true self.p.name = 'other' self.p.is_primary = false self.p.set_role('replica') mock_restart = self.p.restart = Mock(return_value=True) self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other')) self.ha.touch_member = Mock(return_value=True) self.ha.dcs.get_cluster = Mock(side_effect=[ get_cluster_initialized_with_leader(sync=('leader', syncstandby)) for syncstandby in ['other', None]]) with patch('time.sleep') as mock_sleep: self.ha.restart({}) mock_restart.assert_called_once() mock_sleep.assert_called() # Restart is still called when DCS connection fails mock_restart.reset_mock() self.ha.dcs.get_cluster = Mock(side_effect=DCSError("foo")) self.ha.restart({}) mock_restart.assert_called_once() # We don't try to fetch the cluster state when touch_member fails mock_restart.reset_mock() self.ha.dcs.get_cluster.reset_mock() self.ha.touch_member = Mock(return_value=False) self.ha.restart({}) mock_restart.assert_called_once() self.ha.dcs.get_cluster.assert_not_called() @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_enable_synchronous_mode(self): self.ha.is_synchronous_mode = true self.ha.has_lock = true self.p.name = 'leader' self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(), CaseInsensitiveSet())) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) with patch('patroni.ha.logger.info') as mock_logger: self.ha.run_cycle() self.assertEqual(mock_logger.call_args_list[0][0][0], 'Enabled synchronous replication') self.ha.dcs.write_sync_state = Mock(return_value=None) with patch('patroni.ha.logger.warning') as mock_logger: self.ha.run_cycle() self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed') @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_inconsistent_synchronous_state(self): self.ha.is_synchronous_mode = true self.ha.has_lock = true self.p.name = 'leader' self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a')) self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(), CaseInsensitiveSet('a'))) self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty()) mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock() with patch('patroni.ha.logger.warning') as mock_logger: self.ha.run_cycle() mock_set_sync.assert_called_once() self.assertTrue(mock_logger.call_args_list[0][0][0].startswith('Inconsistent state between ')) self.ha.dcs.write_sync_state = Mock(return_value=None) with patch('patroni.ha.logger.warning') as mock_logger: self.ha.run_cycle() self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed') def test_effective_tags(self): self.ha._disable_sync = True self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar', 'nosync': True}) self.ha._disable_sync = False self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar'}) @patch('patroni.postgresql.mtime', Mock(return_value=1588316884)) @patch('builtins.open', Mock(side_effect=Exception)) def test_restore_cluster_config(self): self.ha.cluster.config.data.clear() self.ha.has_lock = true self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') def test_watch(self): self.ha.cluster = get_cluster_initialized_with_leader() self.ha.watch(0) def test_wakup(self): self.ha.wakeup() def test_shutdown(self): self.p.is_running = false self.ha.is_leader = true def stop(*args, **kwargs): kwargs['on_shutdown'](123, 120) self.p.stop = stop self.ha.shutdown() self.ha.is_failover_possible = true self.ha.shutdown() @patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False)) def test_shutdown_citus_worker(self): self.ha.is_leader = true self.p.is_running = Mock(side_effect=[Mock(), False]) self.ha.patroni.request = Mock() self.ha.shutdown() self.ha.patroni.request.assert_called() self.assertEqual(self.ha.patroni.request.call_args[0][2], 'citus') self.assertEqual(self.ha.patroni.request.call_args[0][3]['type'], 'before_demote') @patch('time.sleep', Mock()) def test_leader_with_not_accessible_data_directory(self): self.ha.cluster = get_cluster_initialized_with_leader() self.ha.has_lock = true self.p.data_directory_empty = Mock(side_effect=OSError(5, "Input/output error: '{}'".format(self.p.data_dir))) self.assertEqual(self.ha.run_cycle(), 'released leader key voluntarily as data dir not accessible and currently leader') self.assertEqual(self.p.role, 'uninitialized') # as has_lock is mocked out, we need to fake the leader key release self.ha.has_lock = false # will not say bootstrap because data directory is not accessible self.assertEqual(self.ha.run_cycle(), "data directory is not accessible: [Errno 5] Input/output error: '{}'".format(self.p.data_dir)) @patch('patroni.postgresql.mtime', Mock(return_value=1588316884)) @patch('builtins.open', mock_open(read_data=('1\t0/40159C0\tno recovery target specified\n\n' '2\t1/40159C0\tno recovery target specified\n'))) @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_update_cluster_history(self): self.ha.has_lock = true for tl in (1, 3): self.p.get_primary_timeline = Mock(return_value=tl) self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') @patch('sys.exit', return_value=1) def test_abort_join(self, exit_mock): self.ha.cluster = get_cluster_not_initialized_without_leader() self.p.is_primary = false self.ha.run_cycle() exit_mock.assert_called_once_with(1) self.p.set_role('replica') self.ha.dcs.initialize = Mock() with patch.object(Postgresql, 'cb_called', PropertyMock(return_value=True)): self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') self.ha.dcs.initialize.assert_not_called() @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_after_pause(self): self.ha.has_lock = true self.ha.is_paused = true self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0), the leader with the lock') self.ha.is_paused = false self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock') @patch('patroni.psycopg.connect', psycopg_connect) def test_permanent_logical_slots_after_promote(self): self.p._major_version = 110000 config = ClusterConfig(1, {'slots': {'l': {'database': 'postgres', 'plugin': 'test_decoding'}}}, 1) self.p.name = 'other' self.ha.cluster = get_cluster_initialized_without_leader(cluster_config=config) self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader') self.ha.cluster = get_cluster_initialized_without_leader(leader=True, cluster_config=config) self.ha.has_lock = true self.assertEqual(self.ha.run_cycle(), 'no action. I am (other), the leader with the lock') @patch.object(Cluster, 'has_member', true) def test_run_cycle(self): self.ha.dcs.touch_member = Mock(side_effect=DCSError('foo')) self.assertEqual(self.ha.run_cycle(), 'Unexpected exception raised, please report it as a BUG') self.ha.dcs.touch_member = Mock(side_effect=PatroniFatalException('foo')) self.assertRaises(PatroniFatalException, self.ha.run_cycle) def test_empty_directory_in_pause(self): self.ha.is_paused = true self.p.data_directory_empty = true self.assertEqual(self.ha.run_cycle(), 'PAUSE: running with empty data directory') self.assertEqual(self.p.role, 'uninitialized') @patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True)) def test_sysid_no_match_in_pause(self): self.ha.is_paused = true self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'} self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock') self.ha.has_lock = true self.assertEqual(self.ha.run_cycle(), 'PAUSE: released leader key voluntarily due to the system ID mismatch') @patch('patroni.psycopg.connect', psycopg_connect) @patch('os.path.exists', Mock(return_value=True)) @patch('shutil.rmtree', Mock()) @patch('os.makedirs', Mock()) @patch('os.open', Mock()) @patch('os.fsync', Mock()) @patch('os.close', Mock()) @patch('os.chmod', Mock()) @patch('os.rename', Mock()) @patch('patroni.postgresql.Postgresql.is_starting', Mock(return_value=False)) @patch('builtins.open', mock_open()) @patch.object(ConfigHandler, 'check_recovery_conf', Mock(return_value=(False, False))) @patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000)) @patch.object(SlotsHandler, 'sync_replication_slots', Mock(return_value=['ls'])) def test_follow_copy(self): self.ha.cluster.config.data['slots'] = {'ls': {'database': 'a', 'plugin': 'b'}} self.p.is_primary = false self.assertTrue(self.ha.run_cycle().startswith('Copying logical slots')) def test_acquire_lock(self): self.ha.dcs.attempt_to_acquire_leader = Mock(side_effect=[DCSError('foo'), Exception]) self.assertRaises(DCSError, self.ha.acquire_lock) self.assertFalse(self.ha.acquire_lock()) @patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False)) def test_notify_citus_coordinator(self): self.ha.patroni.request = Mock() self.ha.notify_mpp_coordinator('before_demote') self.ha.patroni.request.assert_called_once() self.assertEqual(self.ha.patroni.request.call_args[1]['timeout'], 30) self.ha.patroni.request = Mock(side_effect=Exception) with patch('patroni.ha.logger.warning') as mock_logger: self.ha.notify_mpp_coordinator('before_promote') self.assertEqual(self.ha.patroni.request.call_args[1]['timeout'], 2) mock_logger.assert_called() self.assertTrue(mock_logger.call_args[0][0].startswith('Request to %s coordinator leader')) self.assertEqual(mock_logger.call_args[0][1], 'Citus') @patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)) @patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True)) def test_process_sync_replication_prepromote(self): self.p._major_version = 90500 self.ha.cluster = get_cluster_initialized_without_leader(sync=('other', self.p.name + ',foo')) self.p.is_primary = false self.p.set_role('replica') mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None) # Postgres 9.5, write_sync_state to DCS failed self.assertEqual(self.ha.run_cycle(), 'Postponing promotion because synchronous replication state was updated by somebody else') self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1) self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0)) self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) mock_set_sync = self.p.config.set_synchronous_standby_names = Mock() mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=True) # Postgres 9.5, our name is written to leader of the /sync key, while voters list and ssn is empty self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1) self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0)) self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) self.assertEqual(mock_set_sync.call_count, 1) self.assertEqual(mock_set_sync.call_args_list[0][0], (None,)) self.p._major_version = 90600 mock_set_sync.reset_mock() mock_write_sync.reset_mock() self.p.set_role('replica') # Postgres 9.6, with quorum commit we avoid updating /sync key and put some nodes to ssn self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') self.assertEqual(mock_write_sync.call_count, 0) self.assertEqual(mock_set_sync.call_count, 1) self.assertEqual(mock_set_sync.call_args_list[0][0], ('2 (foo,other)',)) self.p._major_version = 150000 mock_set_sync.reset_mock() self.p.set_role('replica') self.p.name = 'nonsync' self.ha.fetch_node_status = get_node_status() # Postgres 15, with quorum commit. Non-sync node promoted we avoid updating /sync key and put some nodes to ssn self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock') self.assertEqual(mock_write_sync.call_count, 0) self.assertEqual(mock_set_sync.call_count, 1) self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 3 (foo,other,postgresql0)',)) @patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)) @patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True)) def test__process_quorum_replication(self): self.p._major_version = 150000 self.ha.has_lock = true mock_set_sync = self.p.config.set_synchronous_standby_names = Mock() self.p.name = 'leader' mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None) # Test /sync key is attempted to set and failed when missing or invalid self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 1, CaseInsensitiveSet(['other']), CaseInsensitiveSet(['other']))) self.ha.run_cycle() self.assertEqual(mock_write_sync.call_count, 1) self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0)) self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None}) self.assertEqual(mock_set_sync.call_count, 0) self.ha._promote_timestamp = 1 mock_write_sync = self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState(None, self.p.name, None, 0), None]) # Test /sync key is attempted to set and succeed when missing or invalid with patch.object(SyncState, 'is_empty', Mock(side_effect=[True, False])): self.ha.run_cycle() self.assertEqual(mock_write_sync.call_count, 2) self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0)) self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None}) self.assertEqual(mock_write_sync.call_args_list[1][0], (self.p.name, CaseInsensitiveSet(['other']), 0)) self.assertEqual(mock_write_sync.call_args_list[1][1], {'version': None}) self.assertEqual(mock_set_sync.call_count, 0) self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, 0, CaseInsensitiveSet(['foo']), CaseInsensitiveSet(['other'])), _SyncState('quorum', 1, 1, CaseInsensitiveSet(['foo']), CaseInsensitiveSet(['foo']))]) mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState(1, 'leader', 'foo', 0)) self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo')) # Test the sync node is removed from voters, added to ssn with patch.object(Postgresql, 'synchronous_standby_names', Mock(return_value='other')), \ patch('time.sleep', Mock()): self.ha.run_cycle() self.assertEqual(mock_write_sync.call_count, 1) self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0)) self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) self.assertEqual(mock_set_sync.call_count, 1) self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (other)',)) # Test ANY 1 (*) when synchronous_mode_strict and no nodes available self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 0, CaseInsensitiveSet(['other', 'foo']), CaseInsensitiveSet())) mock_write_sync.reset_mock() mock_set_sync.reset_mock() with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)): self.ha.run_cycle() self.assertEqual(mock_write_sync.call_count, 1) self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0)) self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0}) self.assertEqual(mock_set_sync.call_count, 1) self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (*)',)) # Test that _process_quorum_replication doesn't take longer than loop_wait with patch('time.time', Mock(side_effect=[30, 60, 90, 120])): self.ha.process_sync_replication()