Files
patroni/tests/test_ha.py
2025-04-18 17:12:43 +02:00

1887 lines
104 KiB
Python

import datetime
import os
import sys
from unittest.mock import MagicMock, Mock, mock_open, patch, PropertyMock
import etcd
from patroni import global_config
from patroni.collections import CaseInsensitiveSet
from patroni.config import Config
from patroni.dcs import Cluster, ClusterConfig, Failover, get_dcs, Leader, Member, Status, SyncState, TimelineHistory
from patroni.dcs.etcd import AbstractEtcdClientWithFailover
from patroni.exceptions import DCSError, PatroniFatalException, PostgresConnectionException
from patroni.ha import _MemberStatus, Ha
from patroni.postgresql import Postgresql
from patroni.postgresql.bootstrap import Bootstrap
from patroni.postgresql.callback_executor import CallbackAction
from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.config import ConfigHandler
from patroni.postgresql.misc import PostgresqlRole, PostgresqlState
from patroni.postgresql.postmaster import PostmasterProcess
from patroni.postgresql.rewind import Rewind
from patroni.postgresql.slots import SlotsHandler
from patroni.postgresql.sync import _SyncState
from patroni.utils import tzutc
from patroni.watchdog import Watchdog
from . import MockPostmaster, PostgresInit, psycopg_connect, requests_get
from .test_etcd import etcd_read, etcd_write, socket_getaddrinfo
SYSID = '12345678901'
def true(*args, **kwargs):
return True
def false(*args, **kwargs):
return False
def get_cluster(initialize, leader, members, failover, sync, cluster_config=None, failsafe=None):
t = datetime.datetime.now().isoformat()
history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '","foo"]]',
[(1, 67197376, 'no recovery target specified', t, 'foo')])
cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True, 'member_slots_ttl': 0}, 1)
return Cluster(initialize, cluster_config, leader, Status(10, None, []), members, failover, sync, history, failsafe)
def get_cluster_not_initialized_without_leader(cluster_config=None):
return get_cluster(None, None, [], None, SyncState.empty(), cluster_config)
def get_cluster_bootstrapping_without_leader(cluster_config=None):
return get_cluster("", None, [], None, SyncState.empty(), cluster_config)
def get_cluster_initialized_without_leader(leader=False, failover=None, sync=None, cluster_config=None, failsafe=False):
m1 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres',
'api_url': 'http://127.0.0.1:8008/patroni', 'xlog_location': 4,
'role': PostgresqlRole.PRIMARY, 'state': 'running'})
leader = Leader(0, 0, m1 if leader else Member(0, '', 28, {}))
m2 = Member(0, 'other', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
'api_url': 'http://127.0.0.1:8011/patroni',
'state': 'running',
'pause': True,
'tags': {'clonefrom': True},
'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00",
'postgres_version': '99.0.0'}})
syncstate = SyncState(0 if sync else None, sync and sync[0],
sync and sync[1], sync[2] if sync and len(sync) > 2 else 0)
failsafe = {m.name: m.api_url for m in (m1, m2)} if failsafe else None
return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config, failsafe)
def get_cluster_initialized_with_leader(failover=None, sync=None):
return get_cluster_initialized_without_leader(leader=True, failover=failover, sync=sync)
def get_cluster_initialized_with_only_leader(failover=None, cluster_config=None):
leader = get_cluster_initialized_without_leader(leader=True, failover=failover).leader
return get_cluster(True, leader, [leader.member], failover, SyncState.empty(), cluster_config)
def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None):
return get_cluster_initialized_with_only_leader(
cluster_config=ClusterConfig(1, {
"standby_cluster": {
"host": "localhost",
"port": 5432,
"primary_slot_name": "",
}}, 1)
)
def get_cluster_initialized_with_leader_and_failsafe():
return get_cluster_initialized_without_leader(leader=True, failsafe=True,
cluster_config=ClusterConfig(1, {'failsafe_mode': True}, 1))
def get_node_status(reachable=True, in_recovery=True, dcs_last_seen=0,
timeline=2, wal_position=10, nofailover=False,
watchdog_failed=False, failover_priority=1, sync_priority=1):
def fetch_node_status(e):
tags = {}
if nofailover:
tags['nofailover'] = True
tags['failover_priority'] = failover_priority
tags['sync_priority'] = sync_priority
return _MemberStatus(e, reachable, in_recovery, wal_position,
{'tags': tags, 'watchdog_failed': watchdog_failed,
'dcs_last_seen': dcs_last_seen, 'timeline': timeline})
return fetch_node_status
future_restart_time = datetime.datetime.now(tzutc) + datetime.timedelta(days=5)
postmaster_start_time = datetime.datetime.now(tzutc)
class MockPatroni(object):
def __init__(self, p, d):
os.environ[Config.PATRONI_CONFIG_VARIABLE] = """
restapi:
listen: 0.0.0.0:8008
bootstrap:
postgresql:
name: foo
data_dir: data/postgresql0
pg_rewind:
username: postgres
password: postgres
watchdog:
mode: off
zookeeper:
exhibitor:
hosts: [localhost]
port: 8181
"""
# We rely on sys.argv in Config, so it's necessary to reset
# all the extra values that are coming from py.test
sys.argv = sys.argv[:1]
self.config = Config(None)
self.version = '1.5.7'
self.postgresql = p
self.dcs = d
self.api = Mock()
self.tags = {'foo': 'bar'}
self.nofailover = None
self.replicatefrom = None
self.api.connection_string = 'http://127.0.0.1:8008'
self.clonefrom = None
self.nosync = False
self.nostream = False
self.scheduled_restart = {'schedule': future_restart_time,
'postmaster_start_time': str(postmaster_start_time)}
self.watchdog = Watchdog(self.config)
self.request = lambda *args, **kwargs: requests_get(args[0].api_url, *args[1:], **kwargs)
self.failover_priority = 1
self.sync_priority = 1
def run_async(self, func, args=()):
self.reset_scheduled_action()
if args:
func(*args)
else:
func()
@patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster()))
@patch.object(Postgresql, 'is_primary', Mock(return_value=True))
@patch.object(Postgresql, 'timeline_wal_position', Mock(return_value=(1, 10, 1)))
@patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value=10))
@patch.object(Postgresql, 'slots', Mock(return_value={'l': 100}))
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))
@patch.object(Postgresql, 'controldata', Mock(return_value={
'Database system identifier': SYSID,
'Database cluster state': 'shut down',
'Latest checkpoint location': '0/12345678',
"Latest checkpoint's TimeLineID": '2'}))
@patch.object(SlotsHandler, 'load_replication_slots', Mock(side_effect=Exception))
@patch.object(ConfigHandler, 'append_pg_hba', Mock())
@patch.object(ConfigHandler, 'write_pgpass', Mock(return_value={}))
@patch.object(ConfigHandler, 'write_recovery_conf', Mock())
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
@patch.object(Postgresql, 'query', Mock())
@patch.object(Postgresql, 'checkpoint', Mock())
@patch.object(CancellableSubprocess, 'call', Mock(return_value=0))
@patch.object(Postgresql, 'get_replica_timeline', Mock(return_value=2))
@patch.object(Postgresql, 'get_primary_timeline', Mock(return_value=2))
@patch.object(Postgresql, 'get_major_version', Mock(return_value=140000))
@patch.object(Postgresql, 'resume_wal_replay', Mock())
@patch.object(ConfigHandler, 'restore_configuration_files', Mock())
@patch.object(etcd.Client, 'write', etcd_write)
@patch.object(etcd.Client, 'read', etcd_read)
@patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException))
@patch('patroni.postgresql.polling_loop', Mock(return_value=range(1)))
@patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=False))
@patch('patroni.async_executor.AsyncExecutor.run_async', run_async)
@patch('patroni.postgresql.rewind.Thread', Mock())
@patch('patroni.postgresql.mpp.citus.CitusHandler.start', Mock())
@patch('subprocess.call', Mock(return_value=0))
@patch('time.sleep', Mock())
class TestHa(PostgresInit):
@patch('socket.getaddrinfo', socket_getaddrinfo)
@patch('patroni.dcs.dcs_modules', Mock(return_value=['patroni.dcs.etcd']))
@patch.object(etcd.Client, 'read', etcd_read)
@patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379']))
@patch.object(Config, '_load_cache', Mock())
def setUp(self):
super(TestHa, self).setUp()
self.p.set_state(PostgresqlState.RUNNING)
self.p.set_role(PostgresqlRole.REPLICA)
self.p.postmaster_start_time = MagicMock(return_value=str(postmaster_start_time))
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=False)
self.e = get_dcs({'etcd': {'ttl': 30, 'host': 'ok:2379', 'scope': 'test',
'name': 'foo', 'retry_timeout': 10},
'citus': {'database': 'citus', 'group': None}})
self.ha = Ha(MockPatroni(self.p, self.e))
self.ha.old_cluster = self.e.get_cluster()
self.ha.cluster = get_cluster_initialized_without_leader()
global_config.update(self.ha.cluster)
self.ha.load_cluster_from_dcs = Mock()
def test_update_lock(self):
self.ha.is_failsafe_mode = true
self.p.last_operation = Mock(side_effect=PostgresConnectionException(''))
self.ha.dcs.update_leader = Mock(side_effect=[DCSError(''), Exception])
self.assertRaises(DCSError, self.ha.update_lock)
self.assertFalse(self.ha.update_lock(True))
@patch.object(Postgresql, 'received_timeline', Mock(return_value=None))
def test_touch_member(self):
self.p._major_version = 110000
self.p.is_primary = false
self.p.timeline_wal_position = Mock(return_value=(0, 1, 0))
self.p.replica_cached_timeline = Mock(side_effect=Exception)
with patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value='streaming')):
self.ha.touch_member()
self.p.timeline_wal_position = Mock(return_value=(0, 1, 1))
self.p.set_role(PostgresqlRole.STANDBY_LEADER)
self.ha.touch_member()
self.p.set_role(PostgresqlRole.PRIMARY)
self.ha.dcs.touch_member = true
self.ha.touch_member()
def test_is_leader(self):
self.assertFalse(self.ha.is_leader())
def test_start_as_replica(self):
self.p.is_healthy = false
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
@patch('patroni.dcs.etcd.Etcd.initialize', return_value=True)
def test_bootstrap_as_standby_leader(self, initialize):
self.p.data_directory_empty = true
self.ha.cluster = get_cluster_not_initialized_without_leader(
cluster_config=ClusterConfig(1, {"standby_cluster": {"port": 5432}}, 1))
self.assertEqual(self.ha.run_cycle(), 'trying to bootstrap a new standby leader')
def test_bootstrap_waiting_for_standby_leader(self):
self.p.data_directory_empty = true
self.ha.cluster = get_cluster_initialized_without_leader()
self.ha.cluster.config.data.update({'standby_cluster': {'port': 5432}})
self.assertEqual(self.ha.run_cycle(), 'waiting for standby_leader to bootstrap')
@patch.object(Cluster, 'get_clone_member',
Mock(return_value=Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni',
'conn_url': 'postgres://127.0.0.1:5432/postgres'})))
@patch.object(Bootstrap, 'create_replica', Mock(return_value=0))
def test_start_as_cascade_replica_in_standby_cluster(self):
self.p.data_directory_empty = true
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(), "trying to bootstrap from replica 'test'")
def test_recover_replica_failed(self):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID}
self.p.is_running = false
self.p.follow = false
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.assertEqual(self.ha.run_cycle(), 'failed to start postgres')
def test_recover_raft(self):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID}
self.p.is_running = false
self.p.follow = true
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.p.is_running = true
ha_dcs_orig_name = self.ha.dcs.__class__.__name__
self.ha.dcs.__class__.__name__ = 'Raft'
self.assertEqual(self.ha.run_cycle(), 'started as a secondary')
self.ha.dcs.__class__.__name__ = ha_dcs_orig_name
def test_recover_former_primary(self):
self.p.follow = false
self.p.is_running = false
self.p.name = 'leader'
self.p.set_role(PostgresqlRole.DEMOTED)
self.p.controldata = lambda: {'Database cluster state': 'shut down', 'Database system identifier': SYSID}
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'starting as readonly because i had the session lock')
def test_start_primary_after_failure(self):
self.p.start = false
self.p.is_running = false
self.p.name = 'leader'
self.p.set_role(PostgresqlRole.PRIMARY)
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'starting primary after failure')
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
def test_crash_recovery(self):
self.ha.has_lock = true
self.p.is_running = false
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)), \
patch.object(Ha, 'check_timeline', Mock(return_value=False)):
self.ha._async_executor.schedule('doing crash recovery in a single user mode')
self.ha.state_handler.cancellable._process = Mock()
self.ha._crash_recovery_started -= 600
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 10})
self.assertEqual(self.ha.run_cycle(), 'terminated crash recovery because of startup timeout')
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_crash_recovery_before_rewind(self):
self.p.is_primary = false
self.p.is_running = false
self.p.controldata = lambda: {'Database cluster state': 'in archive recovery',
'Database system identifier': SYSID}
self.ha._rewind.trigger_check_diverged_lsn()
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
@patch('os.listdir', Mock(return_value=[]))
@patch('patroni.postgresql.rewind.fsync_dir', Mock())
def test_recover_with_rewind(self):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.cluster.leader.member.data.update(version='2.0.2', role=PostgresqlRole.PRIMARY)
self.ha._rewind.pg_rewind = true
self.ha._rewind.check_leader_is_not_in_recovery = true
with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)):
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=False)), \
patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.p.follow = true
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.p.is_running = true
self.ha.follow = Mock(return_value='fake')
self.assertEqual(self.ha.run_cycle(), 'fake')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'should_remove_data_directory_on_diverged_timelines', PropertyMock(return_value=True))
@patch.object(Bootstrap, 'create_replica', Mock(return_value=1))
def test_recover_with_reinitialize(self):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'reinitializing due to diverged timelines')
@patch('sys.exit', return_value=1)
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
def test_sysid_no_match(self, exit_mock):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'}
self.ha.run_cycle()
exit_mock.assert_called_once_with(1)
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_start_as_readonly(self):
self.p.is_primary = false
self.p.is_healthy = true
self.ha.has_lock = true
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
@patch('patroni.psycopg.connect', psycopg_connect)
def test_acquire_lock_as_primary(self):
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
def test_leader_race_stale_primary(self):
with patch.object(Postgresql, 'get_primary_timeline', Mock(return_value=1)), \
patch('patroni.ha.logger.warning') as mock_logger:
self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node')
self.assertEqual(mock_logger.call_args[0][0], 'My timeline %s is behind last known cluster timeline %s')
def test_promoted_by_acquiring_lock(self):
self.ha.is_healthiest_node = true
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
def test_promotion_cancelled_after_pre_promote_failed(self):
self.p.is_primary = false
self.p._pre_promote = false
self.ha._is_healthiest_node = true
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(self.ha.run_cycle(), 'Promotion cancelled because the pre-promote script failed')
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
def test_lost_leader_lock_during_promote(self):
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.ha._async_executor.schedule('promote')
self.assertEqual(self.ha.run_cycle(), 'lost leader before promote')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_long_promote(self):
self.ha.has_lock = true
self.p.is_primary = false
self.p.set_role(PostgresqlRole.PRIMARY)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_demote_after_failing_to_obtain_lock(self):
self.ha.acquire_lock = false
self.assertEqual(self.ha.run_cycle(), 'demoted self after trying and failing to obtain lock')
def test_follow_new_leader_after_failing_to_obtain_lock(self):
self.ha.is_healthiest_node = true
self.ha.acquire_lock = false
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'following new leader after trying and failing to obtain lock')
def test_demote_because_not_healthiest(self):
self.ha.is_healthiest_node = false
self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node')
def test_follow_new_leader_because_not_healthiest(self):
self.ha.is_healthiest_node = false
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_promote_because_have_lock(self):
self.ha.has_lock = true
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
def test_promote_without_watchdog(self):
self.ha.has_lock = true
self.p.is_primary = true
with patch.object(Watchdog, 'activate', Mock(return_value=False)):
self.assertEqual(self.ha.run_cycle(), 'Demoting self because watchdog could not be activated')
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'Not promoting self because watchdog could not be activated')
def test_leader_with_lock(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_coordinator_leader_with_lock(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch.object(Postgresql, '_wait_for_connection_close', Mock())
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_demote_because_not_having_lock(self):
with patch.object(Watchdog, 'is_running', PropertyMock(return_value=True)):
self.assertEqual(self.ha.run_cycle(), 'demoting self because I do not have the lock and I was a leader')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_demote_because_update_lock_failed(self):
self.ha.has_lock = true
self.ha.update_lock = false
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
with patch.object(Ha, '_get_node_to_follow', Mock(side_effect=DCSError('foo'))):
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS')
def test_get_node_to_follow_nostream(self):
self.ha.patroni.nostream = True
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha._get_node_to_follow(self.ha.cluster), None)
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_follow(self):
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
self.ha.patroni.replicatefrom = "foo"
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
self.ha.cluster.config.data.update({'slots': {'l': {'database': 'a', 'plugin': 'b'}}})
self.ha.cluster.members[1].data['tags']['replicatefrom'] = 'postgresql0'
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
del self.ha.cluster.config.data['slots']
self.ha.cluster.config.data.update({'postgresql': {'use_slots': False}})
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
del self.ha.cluster.config.data['postgresql']['use_slots']
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_follow_in_pause(self):
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_follow_triggers_rewind(self):
self.p.is_primary = false
self.ha._rewind.trigger_check_diverged_lsn()
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
def test_no_dcs_connection_primary_demote(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
self.ha._async_executor.schedule('dummy')
self.assertEqual(self.ha.run_cycle(), 'demoted self because DCS is not accessible and I was a leader')
def test_check_failsafe_topology(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
global_config.update(self.ha.cluster)
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertFalse(self.ha.failsafe_is_active())
self.assertEqual(self.ha.run_cycle(),
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
self.assertTrue(self.ha.failsafe_is_active())
with patch.object(Postgresql, 'slots', Mock(side_effect=Exception)):
self.ha.patroni.request = Mock(side_effect=Exception)
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
self.assertFalse(self.ha.failsafe_is_active())
self.ha.dcs._last_failsafe.clear()
self.ha.dcs._last_failsafe[self.ha.cluster.leader.name] = self.ha.cluster.leader.member.api_url
self.assertEqual(self.ha.run_cycle(),
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
def test_no_dcs_connection_primary_failsafe(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
for m in self.ha.cluster.members:
if m.name != self.ha.cluster.leader.name:
m.data['tags']['replicatefrom'] = 'test'
global_config.update(self.ha.cluster)
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertEqual(self.ha.run_cycle(),
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
def test_readonly_dcs_primary_failsafe(self):
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
self.ha.dcs.update_leader = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertEqual(self.ha.run_cycle(),
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
def test_no_dcs_connection_replica_failsafe(self):
self.p.last_operation = Mock(side_effect=PostgresConnectionException(''))
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
global_config.update(self.ha.cluster)
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
self.p.is_primary = false
with patch('patroni.ha.logger.debug') as mock_logger:
self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible')
self.assertEqual(mock_logger.call_args_list[0][0][0], 'Failed to fetch current wal lsn: %r')
def test_no_dcs_connection_replica_failsafe_not_enabled_but_active(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible')
def test_update_failsafe(self):
self.assertRaises(Exception, self.ha.update_failsafe, {})
self.p.set_role(PostgresqlRole.PRIMARY)
self.assertEqual(self.ha.update_failsafe({}), 'Running as a leader')
def test_call_failsafe_member(self):
member = Member(0, 'test', 1, {'api_url': 'http://localhost:8011/patroni'})
self.ha.patroni.request = Mock()
self.ha.patroni.request.return_value.data = b'Accepted'
self.ha.patroni.request.return_value.status = 200
with patch('patroni.ha.logger.info') as mock_logger:
ret = self.ha.call_failsafe_member({}, member)
self.assertEqual(mock_logger.call_args_list[0][0],
('Got response from %s %s: %s', 'test', 'http://localhost:8011/failsafe', 'Accepted'))
self.assertTrue(ret.accepted)
e = Exception('request failed')
self.ha.patroni.request.side_effect = e
with patch('patroni.ha.logger.warning') as mock_logger:
ret = self.ha.call_failsafe_member({}, member)
self.assertEqual(mock_logger.call_args_list[0][0],
('Request failed to %s: POST %s (%s)', 'test', 'http://localhost:8011/failsafe', e))
self.assertFalse(ret.accepted)
@patch('time.sleep', Mock())
def test_bootstrap_from_another_member(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap from replica \'other\'')
def test_bootstrap_waiting_for_leader(self):
self.ha.cluster = get_cluster_initialized_without_leader()
self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap')
def test_bootstrap_without_leader(self):
self.ha.cluster = get_cluster_initialized_without_leader()
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True)
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap (without leader)')
def test_bootstrap_not_running_concurrently(self):
self.ha.cluster = get_cluster_bootstrapping_without_leader()
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True)
self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap')
def test_bootstrap_initialize_lock_failed(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.assertEqual(self.ha.bootstrap(), 'failed to acquire initialize lock')
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('patroni.postgresql.mpp.citus.connect', psycopg_connect)
@patch('patroni.postgresql.mpp.citus.quote_ident', Mock())
@patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_initialized_new_cluster(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap a new cluster')
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'waiting for end of recovery after bootstrap')
self.p.is_primary = true
self.ha.is_synchronous_mode = true
self.assertEqual(self.ha.run_cycle(), 'running post_bootstrap')
self.assertEqual(self.ha.run_cycle(), 'initialized a new cluster')
def test_bootstrap_release_initialize_key_on_failure(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.ha.bootstrap()
self.p.is_running = false
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('patroni.postgresql.mpp.citus.connect', psycopg_connect)
@patch('patroni.postgresql.mpp.citus.quote_ident', Mock())
@patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_release_initialize_key_on_watchdog_failure(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.ha.bootstrap()
self.p.is_primary = true
with patch.object(Watchdog, 'activate', Mock(return_value=False)), \
patch('patroni.ha.logger.error') as mock_logger:
self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap')
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
self.assertTrue(mock_logger.call_args[0][0].startswith('Cancelling bootstrap because'
' watchdog activation failed'))
@patch('patroni.psycopg.connect', psycopg_connect)
def test_reinitialize(self):
self.assertIsNotNone(self.ha.reinitialize())
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertIsNone(self.ha.reinitialize(True))
self.ha._async_executor.schedule('reinitialize')
self.assertIsNotNone(self.ha.reinitialize())
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertIsNotNone(self.ha.reinitialize())
@patch('time.sleep', Mock())
def test_restart(self):
self.assertEqual(self.ha.restart({}), (True, 'restarted successfully'))
self.p.restart = Mock(return_value=None)
self.assertEqual(self.ha.restart({}), (False, 'postgres is still starting'))
self.p.restart = false
self.assertEqual(self.ha.restart({}), (False, PostgresqlState.RESTART_FAILED))
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha._async_executor.schedule('reinitialize')
self.assertEqual(self.ha.restart({}), (False, 'reinitialize already in progress'))
with patch.object(self.ha, "restart_matches", return_value=False):
self.assertEqual(self.ha.restart({'foo': 'bar'}), (False, "restart conditions are not satisfied"))
@patch('time.sleep', Mock())
@patch.object(ConfigHandler, 'replace_pg_hba', Mock())
@patch.object(ConfigHandler, 'replace_pg_ident', Mock())
@patch.object(PostmasterProcess, 'start', Mock(return_value=MockPostmaster()))
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_worker_restart(self):
self.ha.has_lock = true
self.ha.patroni.request = Mock()
self.p.is_running = Mock(side_effect=[Mock(), False])
self.assertEqual(self.ha.restart({}), (True, 'restarted successfully'))
self.ha.patroni.request.assert_called()
self.assertEqual(self.ha.patroni.request.call_args_list[0][0][3]['type'], 'before_demote')
self.assertEqual(self.ha.patroni.request.call_args_list[1][0][3]['type'], 'after_promote')
@patch('os.kill', Mock())
def test_restart_in_progress(self):
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.ha._async_executor.schedule('restart')
self.assertTrue(self.ha.restart_scheduled())
self.assertEqual(self.ha.run_cycle(), 'restart in progress')
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'restart in progress')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'updated leader lock during restart')
self.ha.update_lock = false
self.p.set_role(PostgresqlRole.PRIMARY)
with patch('patroni.async_executor.CriticalTask.cancel', Mock(return_value=False)), \
patch('patroni.async_executor.CriticalTask.result',
PropertyMock(return_value=PostmasterProcess(os.getpid())), create=True), \
patch('patroni.postgresql.Postgresql.terminate_starting_postmaster') as mock_terminate:
self.assertEqual(self.ha.run_cycle(), 'lost leader lock during restart')
mock_terminate.assert_called()
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: restart in progress')
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_manual_failover_from_leader(self):
self.ha.has_lock = true # I am the leader
# to me
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
mock_warning.assert_called_with('%s: I am already the leader, no need to %s', 'manual failover', 'failover')
# to a non-existent candidate
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'blabla', None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
mock_warning.assert_called_with(
'%s: no healthy members found, %s is not possible', 'manual failover', 'failover')
# to an existent candidate
self.ha.fetch_node_status = get_node_status()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'b', None))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
# to a candidate on an older timeline
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0],
('Timeline %s of member %s is behind the cluster timeline %s', 1, 'b', 2))
# to a lagging candidate
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0],
('Member %s exceeds maximum replication lag', 'b'))
self.ha.cluster.members.pop()
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_manual_switchover_from_leader(self):
self.ha.has_lock = true # I am the leader
self.ha.fetch_node_status = get_node_status()
# different leader specified in failover key, no candidate
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', '', None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
mock_warning.assert_called_with(
'%s: leader name does not match: %s != %s', 'switchover', 'blabla', 'postgresql0')
# no candidate
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None))
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
self.ha._rewind.rewind_or_reinitialize_needed_and_possible = true
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
# other members with failover_limitation_s
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not allowed to promote'))
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(watchdog_failed=True)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not watchdog capable'))
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0],
('Timeline %s of member %s is behind the cluster timeline %s', 1, 'leader', 2))
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s exceeds maximum replication lag', 'leader'))
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_scheduled_switchover_from_leader(self):
self.ha.has_lock = true # I am the leader
self.ha.fetch_node_status = get_node_status()
# switchover scheduled time must include timezone
with patch('patroni.ha.logger.warning') as mock_warning:
scheduled = datetime.datetime.now()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'blabla', scheduled))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertIn('Incorrect value of scheduled_at: %s', mock_warning.call_args_list[0][0])
# scheduled now
scheduled = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=tzutc)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'b', scheduled))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('switchover: demoting myself', self.ha.run_cycle())
# scheduled in the future
with patch('patroni.ha.logger.info') as mock_info:
scheduled = scheduled + datetime.timedelta(seconds=30)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'blabla', scheduled))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.assertIn('Awaiting %s at %s (in %.0f seconds)', mock_info.call_args_list[0][0])
# stale value
with patch('patroni.ha.logger.warning') as mock_warning:
scheduled = scheduled + datetime.timedelta(seconds=-600)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'b', scheduled))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.assertIn('Found a stale %s value, cleaning up: %s', mock_warning.call_args_list[0][0])
def test_manual_switchover_from_leader_in_pause(self):
self.ha.has_lock = true # I am the leader
self.ha.is_paused = true
# no candidate
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None))
with patch('patroni.ha.logger.warning') as mock_warning:
self.assertEqual('PAUSE: no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
mock_warning.assert_called_with(
'%s is possible only to a specific candidate in a paused state', 'Switchover')
def test_manual_failover_from_leader_in_pause(self):
self.ha.has_lock = true
self.ha.fetch_node_status = get_node_status()
self.ha.is_paused = true
# failover from me, candidate is healthy
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, None, 'b', None))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('PAUSE: manual failover: demoting myself', self.ha.run_cycle())
self.ha.cluster.members.pop()
def test_manual_failover_from_leader_in_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.ha.process_sync_replication = Mock()
self.ha.fetch_node_status = get_node_status()
# I am the leader
self.p.is_primary = true
self.ha.has_lock = true
# the candidate is not in sync members but we allow failover to an async candidate
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, None, 'b', None), sync=(self.p.name, 'a'))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('manual failover: demoting myself', self.ha.run_cycle())
self.ha.cluster.members.pop()
def test_manual_switchover_from_leader_in_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.ha.process_sync_replication = Mock()
# I am the leader
self.p.is_primary = true
self.ha.has_lock = true
# candidate specified is not in sync members
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None),
sync=(self.p.name, 'blabla'))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.assertEqual(mock_warning.call_args_list[0][0],
('%s candidate=%s does not match with sync_standbys=%s', 'Switchover', 'a', 'blabla'))
# the candidate is in sync members and is healthy
self.ha.fetch_node_status = get_node_status(wal_position=305419896)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None),
sync=(self.p.name, 'a'))
self.ha.cluster.members.append(Member(0, 'a', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('switchover: demoting myself', self.ha.run_cycle())
# the candidate is in sync members but is not healthy
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(nofailover=true)
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'a', 'not allowed to promote'))
def test_manual_failover_process_no_leader(self):
self.p.is_primary = false
self.p.set_role(PostgresqlRole.REPLICA)
# failover to another member, fetch_node_status for candidate fails
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'leader', None))
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_warning.call_args_list[1][0],
('%s: member %s is %s', 'manual failover', 'leader', 'not reachable'))
# failover to another member, candidate is accessible, in_recovery
self.p.set_role(PostgresqlRole.REPLICA)
self.ha.fetch_node_status = get_node_status()
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# set nofailover flag to True for all members of the cluster
# this should elect the current member, as we are not going to call the API for it.
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
# failover to me but I am set to nofailover. In no case I should be elected as a leader
self.p.set_role(PostgresqlRole.REPLICA)
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None))
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
self.ha.patroni.nofailover = False
# failover to another member that is on an older timeline (only failover_limitation() is checked)
with patch('patroni.ha.logger.info') as mock_info:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'b', None))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
mock_info.assert_called_with('%s: to %s, i am %s', 'manual failover', 'b', 'postgresql0')
# failover to another member lagging behind the cluster_lsn (only failover_limitation() is checked)
with patch('patroni.ha.logger.info') as mock_info:
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
mock_info.assert_called_with('%s: to %s, i am %s', 'manual failover', 'b', 'postgresql0')
def test_manual_switchover_process_no_leader(self):
self.p.is_primary = false
self.p.set_role(PostgresqlRole.REPLICA)
# I was the leader, other members are healthy
self.ha.fetch_node_status = get_node_status()
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, self.p.name, '', None))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# I was the leader, I am the only healthy member
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not reachable'))
self.assertEqual(mock_info.call_args_list[1][0], ('Member %s is %s', 'other', 'not reachable'))
def test_manual_failover_process_no_leader_in_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.p.is_primary = false
self.ha.fetch_node_status = get_node_status(nofailover=True) # other nodes are not healthy
# manual failover when our name (postgresql0) isn't in the /sync key and the candidate node is not available
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None),
sync=('leader1', 'blabla'))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# manual failover when the candidate node isn't available but our name is in the /sync key
# while other sync node is nofailover
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None),
sync=('leader1', 'postgresql0'))
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_warning.call_args_list[0][0],
('%s: member %s is %s', 'manual failover', 'other', 'not allowed to promote'))
# manual failover to our node (postgresql0),
# which name is not in sync nodes list (some sync nodes are available)
self.p.set_role(PostgresqlRole.REPLICA)
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None),
sync=('leader1', 'other'))
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['leader1']),
CaseInsensitiveSet(['leader1'])))
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
def test_manual_switchover_process_no_leader_in_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.p.is_primary = false
# to a specific node, which name doesn't match our name (postgresql0)
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'other', None))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# to our node (postgresql0), which name is not in sync nodes list
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'postgresql0', None),
sync=('leader1', 'blabla'))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# without candidate, our name (postgresql0) is not in the sync nodes list
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None),
sync=('leader', 'blabla'))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# switchover from a specific leader, but the only sync node (us, postgresql0) has nofailover tag
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None),
sync=('postgresql0', None))
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
def test_manual_failover_process_no_leader_in_pause(self):
self.ha.is_paused = true
# I am running as primary, cluster is unlocked, the candidate is allowed to promote
# but we are in pause
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
def test_manual_switchover_process_no_leader_in_pause(self):
self.ha.is_paused = true
# I am running as primary, cluster is unlocked, no candidate specified
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
# the candidate is not running
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'blabla', None))
self.assertEqual('PAUSE: acquired session lock as a leader', self.ha.run_cycle())
self.assertEqual(
mock_warning.call_args_list[0][0],
('%s: removing failover key because failover candidate is not running', 'switchover'))
# switchover to me, I am not leader
self.p.is_primary = false
self.p.set_role(PostgresqlRole.REPLICA)
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: promoted self to leader by acquiring session lock')
def test_is_healthiest_node(self):
self.ha.is_failsafe_mode = true
self.ha.state_handler.is_primary = false
self.ha.patroni.nofailover = False
self.ha.fetch_node_status = get_node_status()
self.ha.dcs._last_failsafe = {'foo': ''}
self.assertFalse(self.ha.is_healthiest_node())
self.ha.dcs._last_failsafe = {'postgresql0': ''}
self.assertTrue(self.ha.is_healthiest_node())
self.ha.dcs._last_failsafe = None
with patch.object(Watchdog, 'is_healthy', PropertyMock(return_value=False)):
self.assertFalse(self.ha.is_healthiest_node())
self.ha.is_paused = true
self.assertFalse(self.ha.is_healthiest_node())
def test__is_healthiest_node(self):
self.p.is_primary = false
self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name))
global_config.update(self.ha.cluster)
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status(in_recovery=False) # accessible, not in_recovery
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status(failover_priority=2) # accessible, in_recovery, higher priority
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
# if there is a higher-priority node but it has a lower WAL position then this node should race
self.ha.fetch_node_status = get_node_status(failover_priority=6, wal_position=9)
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
# if the old leader is a higher-priority node on the same WAL position then this node should race
self.ha.fetch_node_status = get_node_status(failover_priority=6)
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members, leader=self.ha.old_cluster.leader))
self.ha.fetch_node_status = get_node_status(wal_position=11) # accessible, in_recovery, wal position ahead
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
# in synchronous_mode consider itself healthy if the former leader is accessible in read-only and ahead of us
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
global_config.update(self.ha.cluster)
with patch('patroni.postgresql.Postgresql.last_operation', return_value=1):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=None):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=1):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.patroni.nofailover = True
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.patroni.nofailover = None
self.ha.patroni.failover_priority = 0
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
def test_fetch_node_status(self):
member = Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni'})
self.ha.fetch_node_status(member)
member = Member(0, 'test', 1, {'api_url': 'http://localhost:8011/patroni'})
self.ha.patroni.request = Mock()
self.ha.patroni.request.return_value.data = b'{"wal":{"location":1},"role":"primary"}'
ret = self.ha.fetch_node_status(member)
self.assertFalse(ret.in_recovery)
@patch.object(Rewind, 'pg_rewind', true)
@patch.object(Rewind, 'check_leader_is_not_in_recovery', true)
@patch('os.listdir', Mock(return_value=[]))
@patch('patroni.postgresql.rewind.fsync_dir', Mock())
@patch.object(Postgresql, 'call_nowait')
def test_post_recover(self, mock_call_nowait):
self.p.is_running = false
self.ha.has_lock = true
self.p.set_role(PostgresqlRole.PRIMARY)
self.assertEqual(self.ha.post_recover(), 'removed leader key after trying and failing to start postgres')
self.assertEqual(self.p.role, PostgresqlRole.DEMOTED)
mock_call_nowait.assert_called_once_with(CallbackAction.ON_ROLE_CHANGE)
self.ha.has_lock = false
self.assertEqual(self.ha.post_recover(), 'failed to start postgres')
leader = Leader(0, 0, Member(0, 'l', 2, {"version": "1.6", "conn_url": "postgres://a", "role": "primary"}))
self.ha._rewind.execute(leader)
self.p.is_running = true
self.assertIsNone(self.ha.post_recover())
def test_schedule_future_restart(self):
self.ha.patroni.scheduled_restart = {}
# do the restart 2 times. The first one should succeed, the second one should fail
self.assertTrue(self.ha.schedule_future_restart({'schedule': future_restart_time}))
self.assertFalse(self.ha.schedule_future_restart({'schedule': future_restart_time}))
def test_delete_future_restarts(self):
self.ha.delete_future_restart()
def test_evaluate_scheduled_restart(self):
self.p.postmaster_start_time = Mock(return_value=str(postmaster_start_time))
# restart already in progress
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.assertIsNone(self.ha.evaluate_scheduled_restart())
# restart while the postmaster has been already restarted, fails
with patch.object(self.ha,
'future_restart_scheduled',
Mock(return_value={'postmaster_start_time':
str(postmaster_start_time - datetime.timedelta(days=1)),
'schedule': str(future_restart_time)})):
self.assertIsNone(self.ha.evaluate_scheduled_restart())
with patch.object(self.ha,
'future_restart_scheduled',
Mock(return_value={'postmaster_start_time': str(postmaster_start_time),
'schedule': str(future_restart_time)})):
with patch.object(self.ha,
'should_run_scheduled_action', Mock(return_value=True)):
# restart in the future, ok
self.assertIsNotNone(self.ha.evaluate_scheduled_restart())
with patch.object(self.ha, 'restart', Mock(return_value=(False, "Test"))):
# restart in the future, bit the actual restart failed
self.assertIsNone(self.ha.evaluate_scheduled_restart())
def test_scheduled_restart(self):
self.ha.cluster = get_cluster_initialized_with_leader()
with patch.object(self.ha, "evaluate_scheduled_restart", Mock(return_value="restart scheduled")):
self.assertEqual(self.ha.run_cycle(), "restart scheduled")
def test_restart_matches(self):
self.p._role = PostgresqlRole.REPLICA
self.p._connection.server_version = 90500
self.p._pending_restart = True
self.assertFalse(self.ha.restart_matches("primary", "9.5.0", True))
self.assertFalse(self.ha.restart_matches("replica", "9.4.3", True))
self.p._pending_restart = False
self.assertFalse(self.ha.restart_matches("replica", "9.5.2", True))
self.assertTrue(self.ha.restart_matches("replica", "9.5.2", False))
def test_process_healthy_cluster_in_pause(self):
self.p.is_primary = false
self.ha.is_paused = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running as primary')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: waiting to become primary after promote...')
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch('builtins.open', mock_open(read_data='1\t0/40159C0\tno recovery target specified\n'))
def test_process_healthy_standby_cluster_as_standby_leader(self):
self.p.is_primary = false
self.p.name = 'leader'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.p.config.check_recovery_conf = Mock(return_value=(False, False))
self.ha._leader_timeline = 1
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
self.assertEqual(self.ha.run_cycle(), 'no action. I am (leader), the standby leader with the lock')
self.p.set_role(PostgresqlRole.REPLICA)
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
def test_process_healthy_standby_cluster_as_cascade_replica(self):
self.p.is_primary = false
self.p.name = 'replica'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(),
'no action. I am (replica), a secondary, and following a standby leader (leader)')
with patch.object(Leader, 'conn_url', PropertyMock(return_value='')):
self.assertEqual(self.ha.run_cycle(), 'continue following the old known standby leader')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=True))
def test_process_unhealthy_standby_cluster_as_standby_leader(self):
self.p.is_primary = false
self.p.name = 'leader'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.ha.sysid_valid = true
self.p._sysid = True
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader by acquiring session lock')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_process_unhealthy_standby_cluster_as_cascade_replica(self):
self.p.is_primary = false
self.p.name = PostgresqlRole.REPLICA
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertTrue(self.ha.run_cycle().startswith('running pg_rewind from remote_member:'))
def test_recover_unhealthy_leader_in_standby_cluster(self):
self.p.is_primary = false
self.p.name = 'leader'
self.p.is_running = false
self.p.follow = false
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(), 'starting as a standby leader because i had the session lock')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=True))
def test_recover_unhealthy_unlocked_standby_cluster(self):
self.p.is_primary = false
self.p.name = 'leader'
self.p.is_running = false
self.p.follow = false
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.ha.has_lock = false
self.assertEqual(self.ha.run_cycle(), 'trying to follow a remote member because standby cluster is unhealthy')
def test_failed_to_update_lock_in_pause(self):
self.ha.update_lock = false
self.ha.is_paused = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(),
'PAUSE: continue to run as primary after failing to update leader lock in DCS')
def test_postgres_unhealthy_in_pause(self):
self.ha.is_paused = true
self.p.is_healthy = false
self.assertEqual(self.ha.run_cycle(), 'PAUSE: postgres is not running')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running')
def test_no_etcd_connection_in_pause(self):
self.ha.is_paused = true
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: DCS is not accessible')
@patch('patroni.ha.Ha.update_lock', return_value=True)
@patch('patroni.ha.Ha.demote')
def test_starting_timeout(self, demote, update_lock):
def check_calls(seq):
for mock, called in seq:
if called:
mock.assert_called_once()
else:
mock.assert_not_called()
mock.reset_mock()
self.ha.has_lock = true
self.ha.cluster = get_cluster_initialized_with_leader()
self.p.check_for_startup = true
self.p.time_in_state = lambda: 30
self.assertEqual(self.ha.run_cycle(), 'PostgreSQL is still starting up, 270 seconds until timeout')
check_calls([(update_lock, True), (demote, False)])
self.p.time_in_state = lambda: 350
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
self.assertEqual(self.ha.run_cycle(),
'primary start has timed out, but continuing to wait because failover is not possible')
check_calls([(update_lock, True), (demote, False)])
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL because of startup timeout')
check_calls([(update_lock, True), (demote, True)])
update_lock.return_value = False
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL while starting up because leader key was lost')
check_calls([(update_lock, True), (demote, True)])
self.ha.has_lock = false
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(),
'no action. I am (postgresql0), a secondary, and following a leader (leader)')
check_calls([(update_lock, False), (demote, False)])
def test_manual_failover_while_starting(self):
self.ha.has_lock = true
self.p.check_for_startup = true
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
@patch('patroni.ha.Ha.demote')
def test_failover_immediately_on_zero_primary_start_timeout(self, demote):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader(sync=(self.p.name, 'other'))
self.ha.cluster.config.data.update({'synchronous_mode': True, 'primary_start_timeout': 0})
self.ha.has_lock = true
self.ha.update_lock = true
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL to fail over after a crash')
demote.assert_called_once()
def test_primary_stop_timeout(self):
self.assertEqual(self.ha.primary_stop_timeout(), None)
self.ha.cluster.config.data.update({'primary_stop_timeout': 30})
global_config.update(self.ha.cluster)
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.assertEqual(self.ha.primary_stop_timeout(), 30)
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=False)):
self.assertEqual(self.ha.primary_stop_timeout(), None)
self.ha.cluster.config.data['primary_stop_timeout'] = None
global_config.update(self.ha.cluster)
self.assertEqual(self.ha.primary_stop_timeout(), None)
@patch('patroni.postgresql.Postgresql.follow')
def test_demote_immediate(self, follow):
self.ha.has_lock = true
self.e.get_cluster = Mock(return_value=get_cluster_initialized_without_leader())
self.ha.demote('immediate')
follow.assert_called_once_with(None)
def test__process_multisync_replication(self):
self.ha.has_lock = true
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
mock_cfg_set_sync = self.p.config.set_synchronous_standby_names = Mock()
self.p.name = 'leader'
# Test sync key removed when sync mode disabled
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
self.ha.run_cycle()
mock_delete_sync.assert_called_once()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet())
mock_cfg_set_sync.assert_called_once()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test sync key not touched when not there
self.ha.cluster = get_cluster_initialized_with_leader()
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
self.ha.run_cycle()
mock_delete_sync.assert_not_called()
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_called_once()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
self.ha.is_synchronous_mode = true
# Test sync standby not touched when picking the same node
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test sync standby is replaced when switching standbys
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet(['other2'])))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(['other2']))
mock_cfg_set_sync.assert_not_called()
# Test sync standby is replaced when new standby is joined
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2', 'other3'])))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.run_cycle()
self.assertEqual(mock_set_sync.call_args_list[0][0], (CaseInsensitiveSet(['other2']),))
self.assertEqual(mock_set_sync.call_args_list[1][0], (CaseInsensitiveSet(['other2', 'other3']),))
mock_cfg_set_sync.assert_not_called()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test sync standby is not disabled when updating dcs fails
self.ha.dcs.write_sync_state = Mock(return_value=None)
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_not_called()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test changing sync standby
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other')))
# self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2'])))
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test updating sync standby key failed due to race
self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState.empty(), None])
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test updating sync standby key failed due to DCS being not accessible
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(side_effect=DCSError('foo'))
self.ha.run_cycle()
# Test changing sync standby failed due to race
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('somebodyelse', None)))
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test sync set to '*' when synchronous_mode_strict is enabled
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet()))
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
mock_cfg_set_sync.assert_not_called()
# Test the value configured by the user for synchronous_standby_names is used when synchronous mode is disabled
self.ha.is_synchronous_mode = false
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
ssn_mock = PropertyMock(return_value="SOME_SSN")
with patch('patroni.postgresql.config.ConfigHandler.synchronous_standby_names', ssn_mock):
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_called_once_with("SOME_SSN")
def test_sync_replication_become_primary(self):
self.ha.is_synchronous_mode = true
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
self.p.is_primary = false
self.p.set_role(PostgresqlRole.REPLICA)
self.ha.has_lock = true
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader(sync=('other', None))
# When we just became primary nobody is sync
self.assertEqual(self.ha.enforce_primary_role('msg', 'promote msg'), 'promote msg')
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(), 0)
mock_write_sync.assert_called_once_with('leader', None, 0, version=0)
mock_set_sync.reset_mock()
# When we just became primary nobody is sync
self.p.set_role(PostgresqlRole.REPLICA)
mock_write_sync.return_value = False
self.assertTrue(self.ha.enforce_primary_role('msg', 'promote msg') != 'promote msg')
mock_set_sync.assert_not_called()
def test_unhealthy_sync_mode(self):
self.ha.is_synchronous_mode = true
self.p.is_primary = false
self.p.set_role(PostgresqlRole.REPLICA)
self.p.name = 'other'
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other2'))
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
mock_acquire = self.ha.acquire_lock = Mock(return_value=True)
mock_follow = self.p.follow = Mock()
mock_promote = self.p.promote = Mock()
# If we don't match the sync replica we are not allowed to acquire lock
self.ha.run_cycle()
mock_acquire.assert_not_called()
mock_follow.assert_called_once()
self.assertEqual(mock_follow.call_args[0][0], None)
mock_write_sync.assert_not_called()
mock_follow.reset_mock()
# If we do match we will try to promote
self.ha._is_healthiest_node = true
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other'))
self.ha.run_cycle()
mock_acquire.assert_called_once()
mock_follow.assert_not_called()
mock_promote.assert_called_once()
mock_write_sync.assert_called_once_with('other', None, 0, version=0)
def test_disable_sync_when_restarting(self):
self.ha.is_synchronous_mode = true
self.p.name = 'other'
self.p.is_primary = false
self.p.set_role(PostgresqlRole.REPLICA)
mock_restart = self.p.restart = Mock(return_value=True)
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.ha.touch_member = Mock(return_value=True)
self.ha.dcs.get_cluster = Mock(side_effect=[
get_cluster_initialized_with_leader(sync=('leader', syncstandby))
for syncstandby in ['other', None]])
with patch('time.sleep') as mock_sleep:
self.ha.restart({})
mock_restart.assert_called_once()
mock_sleep.assert_called()
# Restart is still called when DCS connection fails
mock_restart.reset_mock()
self.ha.dcs.get_cluster = Mock(side_effect=DCSError("foo"))
self.ha.restart({})
mock_restart.assert_called_once()
# We don't try to fetch the cluster state when touch_member fails
mock_restart.reset_mock()
self.ha.dcs.get_cluster.reset_mock()
self.ha.touch_member = Mock(return_value=False)
self.ha.restart({})
mock_restart.assert_called_once()
self.ha.dcs.get_cluster.assert_not_called()
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_enable_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.ha.has_lock = true
self.p.name = 'leader'
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
with patch('patroni.ha.logger.info') as mock_logger:
self.ha.run_cycle()
self.assertEqual(mock_logger.call_args_list[0][0][0], 'Enabled synchronous replication')
self.ha.dcs.write_sync_state = Mock(return_value=None)
with patch('patroni.ha.logger.warning') as mock_logger:
self.ha.run_cycle()
self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_inconsistent_synchronous_state(self):
self.ha.is_synchronous_mode = true
self.ha.has_lock = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a'))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
CaseInsensitiveSet(), CaseInsensitiveSet('a')))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
with patch('patroni.ha.logger.warning') as mock_logger:
self.ha.run_cycle()
mock_set_sync.assert_called_once()
self.assertTrue(mock_logger.call_args_list[0][0][0].startswith('Inconsistent state between '))
self.ha.dcs.write_sync_state = Mock(return_value=None)
with patch('patroni.ha.logger.warning') as mock_logger:
self.ha.run_cycle()
self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed')
def test_effective_tags(self):
self.ha._disable_sync = True
self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar', 'nosync': True, 'sync_priority': 0})
self.ha._disable_sync = False
self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar'})
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch('builtins.open', Mock(side_effect=Exception))
def test_restore_cluster_config(self):
self.ha.cluster.config.data.clear()
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_watch(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.watch(0)
def test_wakeup(self):
self.ha.wakeup()
def test_shutdown(self):
self.p.is_running = false
self.ha.is_leader = true
def stop(*args, **kwargs):
kwargs['on_shutdown'](123, 120)
self.p.stop = stop
self.ha.shutdown()
self.ha.is_failover_possible = true
self.ha.shutdown()
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_shutdown_citus_worker(self):
self.ha.is_leader = true
self.p.is_running = Mock(side_effect=[Mock(), False])
self.ha.patroni.request = Mock()
self.ha.shutdown()
self.ha.patroni.request.assert_called()
self.assertEqual(self.ha.patroni.request.call_args[0][2], 'citus')
self.assertEqual(self.ha.patroni.request.call_args[0][3]['type'], 'before_demote')
@patch('time.sleep', Mock())
def test_leader_with_not_accessible_data_directory(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.has_lock = true
self.p.data_directory_empty = Mock(side_effect=OSError(5, "Input/output error: '{}'".format(self.p.data_dir)))
self.assertEqual(self.ha.run_cycle(),
'released leader key voluntarily as data dir not accessible and currently leader')
self.assertEqual(self.p.role, PostgresqlRole.UNINITIALIZED)
# as has_lock is mocked out, we need to fake the leader key release
self.ha.has_lock = false
# will not say bootstrap because data directory is not accessible
self.assertEqual(self.ha.run_cycle(),
"data directory is not accessible: [Errno 5] Input/output error: '{}'".format(self.p.data_dir))
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch('builtins.open', mock_open(read_data=('1\t0/40159C0\tno recovery target specified\n\n'
'2\t1/40159C0\tno recovery target specified\n')))
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_update_cluster_history(self):
self.ha.has_lock = true
for tl in (1, 3):
self.p.get_primary_timeline = Mock(return_value=tl)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch('sys.exit', return_value=1)
def test_abort_join(self, exit_mock):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.p.is_primary = false
self.ha.run_cycle()
exit_mock.assert_called_once_with(1)
self.p.set_role(PostgresqlRole.REPLICA)
self.ha.dcs.initialize = Mock()
with patch.object(Postgresql, 'cb_called', PropertyMock(return_value=True)):
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.ha.dcs.initialize.assert_not_called()
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_after_pause(self):
self.ha.has_lock = true
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0), the leader with the lock')
self.ha.is_paused = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch('patroni.psycopg.connect', psycopg_connect)
def test_permanent_logical_slots_after_promote(self):
self.p._major_version = 110000
config = ClusterConfig(1, {'slots': {'l': {'database': 'postgres', 'plugin': 'test_decoding'}}}, 1)
self.p.name = 'other'
self.ha.cluster = get_cluster_initialized_without_leader(cluster_config=config)
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
self.ha.cluster = get_cluster_initialized_without_leader(leader=True, cluster_config=config)
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (other), the leader with the lock')
@patch.object(Cluster, 'has_member', true)
def test_run_cycle(self):
self.ha.dcs.touch_member = Mock(side_effect=DCSError('foo'))
self.assertEqual(self.ha.run_cycle(), 'Unexpected exception raised, please report it as a BUG')
self.ha.dcs.touch_member = Mock(side_effect=PatroniFatalException('foo'))
self.assertRaises(PatroniFatalException, self.ha.run_cycle)
def test_empty_directory_in_pause(self):
self.ha.is_paused = true
self.p.data_directory_empty = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: running with empty data directory')
self.assertEqual(self.p.role, PostgresqlRole.UNINITIALIZED)
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
def test_sysid_no_match_in_pause(self):
self.ha.is_paused = true
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'}
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: released leader key voluntarily due to the system ID mismatch')
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('os.path.exists', Mock(return_value=True))
@patch('shutil.rmtree', Mock())
@patch('os.makedirs', Mock())
@patch('os.open', Mock())
@patch('os.fsync', Mock())
@patch('os.close', Mock())
@patch('os.chmod', Mock())
@patch('os.rename', Mock())
@patch('patroni.postgresql.Postgresql.is_starting', Mock(return_value=False))
@patch('builtins.open', mock_open())
@patch.object(ConfigHandler, 'check_recovery_conf', Mock(return_value=(False, False)))
@patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000))
@patch.object(SlotsHandler, 'sync_replication_slots', Mock(return_value=['ls']))
def test_follow_copy(self):
self.ha.cluster.config.data['slots'] = {'ls': {'database': 'a', 'plugin': 'b'}}
self.p.is_primary = false
self.assertTrue(self.ha.run_cycle().startswith('Copying logical slots'))
def test_acquire_lock(self):
self.ha.dcs.attempt_to_acquire_leader = Mock(side_effect=[DCSError('foo'), Exception])
self.assertRaises(DCSError, self.ha.acquire_lock)
self.assertFalse(self.ha.acquire_lock())
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_notify_citus_coordinator(self):
self.ha.patroni.request = Mock()
self.ha.notify_mpp_coordinator('before_demote')
self.ha.patroni.request.assert_called_once()
self.assertEqual(self.ha.patroni.request.call_args[1]['timeout'], 30)
self.ha.patroni.request = Mock(side_effect=Exception)
with patch('patroni.ha.logger.warning') as mock_logger:
self.ha.notify_mpp_coordinator('before_promote')
self.assertEqual(self.ha.patroni.request.call_args[1]['timeout'], 2)
mock_logger.assert_called()
self.assertTrue(mock_logger.call_args[0][0].startswith('Request to %s coordinator leader'))
self.assertEqual(mock_logger.call_args[0][1], 'Citus')
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
@patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True))
def test_process_sync_replication_prepromote(self):
self.p._major_version = 90500
self.ha.cluster = get_cluster_initialized_without_leader(sync=('other', self.p.name + ',foo'))
self.p.is_primary = false
self.p.set_role(PostgresqlRole.REPLICA)
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
# Postgres 9.5, write_sync_state to DCS failed
self.assertEqual(self.ha.run_cycle(),
'Postponing promotion because synchronous replication state was updated by somebody else')
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
mock_set_sync = self.p.config.set_synchronous_standby_names = Mock()
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=True)
# Postgres 9.5, our name is written to leader of the /sync key, while voters list and ssn is empty
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], (None,))
self.p._major_version = 90600
mock_set_sync.reset_mock()
mock_write_sync.reset_mock()
self.p.set_role(PostgresqlRole.REPLICA)
# Postgres 9.6, with quorum commit we avoid updating /sync key and put some nodes to ssn
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_write_sync.call_count, 0)
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('2 (foo,other)',))
self.p._major_version = 150000
mock_set_sync.reset_mock()
self.p.set_role(PostgresqlRole.REPLICA)
self.p.name = 'nonsync'
self.ha.fetch_node_status = get_node_status()
# Postgres 15, with quorum commit. Non-sync node promoted we avoid updating /sync key and put some nodes to ssn
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_write_sync.call_count, 0)
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 3 (foo,other,postgresql0)',))
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
@patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True))
def test__process_quorum_replication(self):
self.p._major_version = 150000
self.ha.has_lock = true
mock_set_sync = self.p.config.set_synchronous_standby_names = Mock()
self.p.name = 'leader'
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
# Test /sync key is attempted to set and failed when missing or invalid
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 1, CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None})
self.assertEqual(mock_set_sync.call_count, 0)
self.ha._promote_timestamp = 1
mock_write_sync = self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState(None, self.p.name, None, 0), None])
# Test /sync key is attempted to set and succeed when missing or invalid
with patch.object(SyncState, 'is_empty', Mock(side_effect=[True, False])):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 2)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None})
self.assertEqual(mock_write_sync.call_args_list[1][0], (self.p.name, CaseInsensitiveSet(['other']), 0))
self.assertEqual(mock_write_sync.call_args_list[1][1], {'version': None})
self.assertEqual(mock_set_sync.call_count, 0)
self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, 0, CaseInsensitiveSet(['foo']),
CaseInsensitiveSet(['other'])),
_SyncState('quorum', 1, 1, CaseInsensitiveSet(['foo']),
CaseInsensitiveSet(['foo']))])
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState(1, 'leader', 'foo', 0))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo'))
# Test the sync node is removed from voters, added to ssn
with patch.object(Postgresql, 'synchronous_standby_names', Mock(return_value='other')), \
patch('time.sleep', Mock()):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (other)',))
# Test ANY 1 (*) when synchronous_mode_strict and no nodes available
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 0,
CaseInsensitiveSet(['other', 'foo']),
CaseInsensitiveSet()))
mock_write_sync.reset_mock()
mock_set_sync.reset_mock()
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (*)',))
# Test that _process_quorum_replication doesn't take longer than loop_wait
with patch('time.time', Mock(side_effect=[30, 60, 90, 120])):
self.ha.process_sync_replication()
def test_is_failover_possible(self):
self.p._major_version = 140000 # supports_multiple_sync
self.p.name = 'leader'
self.ha.fetch_node_status = get_node_status()
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo,other', 1),
failover=Failover(0, 'leader', 'other', None))
self.ha.cluster.members.append(Member(0, 'foo', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
# switchover when synchronous_mode = off
self.assertTrue(self.ha.is_failover_possible())
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
# switchover to synchronous node when synchronous_mode = on
self.assertTrue(self.ha.is_failover_possible())
with patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True)):
# switchover to synchronous node when synchronous_mode = quorum
self.assertTrue(self.ha.is_failover_possible()) # success, despite the fact that quorum is low
# failover candidate is unhealthy, we are checking if there are other good candidates, but quorum is low
self.assertFalse(self.ha.is_failover_possible(exclude_failover_candidate=True))
# now we satisfy quorum requirements
with patch.object(SyncState, 'quorum', PropertyMock(return_value=0)):
self.assertTrue(self.ha.is_failover_possible(exclude_failover_candidate=True))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo,other', 1),
failover=Failover(0, '', 'foo', None))
# failover to missing node foo
self.assertFalse(self.ha.is_failover_possible())
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo,other', 1),
failover=Failover(0, 'leader', '', None))
# switchover from leader when synchronous_mode = off
self.assertTrue(self.ha.is_failover_possible())
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
# switchover from leader when synchronous_mode = on
self.assertTrue(self.ha.is_failover_possible())
with patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True)):
# switchover from leader when synchronous_mode = quorum
self.assertFalse(self.ha.is_failover_possible()) # failure, because quorum is low