Files
patroni/tests/test_ha.py
Alexander Kukushkin a316105412 Fix bug with priority failover (#3297)
We should ignor the former leader with higher priority when it reports the same LSN as the current node.

This bug could be a contributing factor to issues described in #3295


In addition to that mock socket.getaddrinfo() call in test_api.py to avoid hitting DNS servers.
2025-02-28 09:48:16 +01:00

1846 lines
100 KiB
Python

import datetime
import os
import sys
from unittest.mock import MagicMock, Mock, mock_open, patch, PropertyMock
import etcd
from patroni import global_config
from patroni.collections import CaseInsensitiveSet
from patroni.config import Config
from patroni.dcs import Cluster, ClusterConfig, Failover, get_dcs, Leader, Member, Status, SyncState, TimelineHistory
from patroni.dcs.etcd import AbstractEtcdClientWithFailover
from patroni.exceptions import DCSError, PatroniFatalException, PostgresConnectionException
from patroni.ha import _MemberStatus, Ha
from patroni.postgresql import Postgresql
from patroni.postgresql.bootstrap import Bootstrap
from patroni.postgresql.callback_executor import CallbackAction
from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.config import ConfigHandler
from patroni.postgresql.postmaster import PostmasterProcess
from patroni.postgresql.rewind import Rewind
from patroni.postgresql.slots import SlotsHandler
from patroni.postgresql.sync import _SyncState
from patroni.utils import tzutc
from patroni.watchdog import Watchdog
from . import MockPostmaster, PostgresInit, psycopg_connect, requests_get
from .test_etcd import etcd_read, etcd_write, socket_getaddrinfo
SYSID = '12345678901'
def true(*args, **kwargs):
return True
def false(*args, **kwargs):
return False
def get_cluster(initialize, leader, members, failover, sync, cluster_config=None, failsafe=None):
t = datetime.datetime.now().isoformat()
history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '","foo"]]',
[(1, 67197376, 'no recovery target specified', t, 'foo')])
cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True, 'member_slots_ttl': 0}, 1)
return Cluster(initialize, cluster_config, leader, Status(10, None, []), members, failover, sync, history, failsafe)
def get_cluster_not_initialized_without_leader(cluster_config=None):
return get_cluster(None, None, [], None, SyncState.empty(), cluster_config)
def get_cluster_bootstrapping_without_leader(cluster_config=None):
return get_cluster("", None, [], None, SyncState.empty(), cluster_config)
def get_cluster_initialized_without_leader(leader=False, failover=None, sync=None, cluster_config=None, failsafe=False):
m1 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres',
'api_url': 'http://127.0.0.1:8008/patroni', 'xlog_location': 4,
'role': 'primary', 'state': 'running'})
leader = Leader(0, 0, m1 if leader else Member(0, '', 28, {}))
m2 = Member(0, 'other', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
'api_url': 'http://127.0.0.1:8011/patroni',
'state': 'running',
'pause': True,
'tags': {'clonefrom': True},
'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00",
'postgres_version': '99.0.0'}})
syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1], 0)
failsafe = {m.name: m.api_url for m in (m1, m2)} if failsafe else None
return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config, failsafe)
def get_cluster_initialized_with_leader(failover=None, sync=None):
return get_cluster_initialized_without_leader(leader=True, failover=failover, sync=sync)
def get_cluster_initialized_with_only_leader(failover=None, cluster_config=None):
leader = get_cluster_initialized_without_leader(leader=True, failover=failover).leader
return get_cluster(True, leader, [leader.member], failover, SyncState.empty(), cluster_config)
def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None):
return get_cluster_initialized_with_only_leader(
cluster_config=ClusterConfig(1, {
"standby_cluster": {
"host": "localhost",
"port": 5432,
"primary_slot_name": "",
}}, 1)
)
def get_cluster_initialized_with_leader_and_failsafe():
return get_cluster_initialized_without_leader(leader=True, failsafe=True,
cluster_config=ClusterConfig(1, {'failsafe_mode': True}, 1))
def get_node_status(reachable=True, in_recovery=True, dcs_last_seen=0,
timeline=2, wal_position=10, nofailover=False,
watchdog_failed=False, failover_priority=1, sync_priority=1):
def fetch_node_status(e):
tags = {}
if nofailover:
tags['nofailover'] = True
tags['failover_priority'] = failover_priority
tags['sync_priority'] = sync_priority
return _MemberStatus(e, reachable, in_recovery, wal_position,
{'tags': tags, 'watchdog_failed': watchdog_failed,
'dcs_last_seen': dcs_last_seen, 'timeline': timeline})
return fetch_node_status
future_restart_time = datetime.datetime.now(tzutc) + datetime.timedelta(days=5)
postmaster_start_time = datetime.datetime.now(tzutc)
class MockPatroni(object):
def __init__(self, p, d):
os.environ[Config.PATRONI_CONFIG_VARIABLE] = """
restapi:
listen: 0.0.0.0:8008
bootstrap:
postgresql:
name: foo
data_dir: data/postgresql0
pg_rewind:
username: postgres
password: postgres
watchdog:
mode: off
zookeeper:
exhibitor:
hosts: [localhost]
port: 8181
"""
# We rely on sys.argv in Config, so it's necessary to reset
# all the extra values that are coming from py.test
sys.argv = sys.argv[:1]
self.config = Config(None)
self.version = '1.5.7'
self.postgresql = p
self.dcs = d
self.api = Mock()
self.tags = {'foo': 'bar'}
self.nofailover = None
self.replicatefrom = None
self.api.connection_string = 'http://127.0.0.1:8008'
self.clonefrom = None
self.nosync = False
self.nostream = False
self.scheduled_restart = {'schedule': future_restart_time,
'postmaster_start_time': str(postmaster_start_time)}
self.watchdog = Watchdog(self.config)
self.request = lambda *args, **kwargs: requests_get(args[0].api_url, *args[1:], **kwargs)
self.failover_priority = 1
self.sync_priority = 1
def run_async(self, func, args=()):
self.reset_scheduled_action()
if args:
func(*args)
else:
func()
@patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster()))
@patch.object(Postgresql, 'is_primary', Mock(return_value=True))
@patch.object(Postgresql, 'timeline_wal_position', Mock(return_value=(1, 10, 1)))
@patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value=10))
@patch.object(Postgresql, 'slots', Mock(return_value={'l': 100}))
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))
@patch.object(Postgresql, 'controldata', Mock(return_value={
'Database system identifier': SYSID,
'Database cluster state': 'shut down',
'Latest checkpoint location': '0/12345678',
"Latest checkpoint's TimeLineID": '2'}))
@patch.object(SlotsHandler, 'load_replication_slots', Mock(side_effect=Exception))
@patch.object(ConfigHandler, 'append_pg_hba', Mock())
@patch.object(ConfigHandler, 'write_pgpass', Mock(return_value={}))
@patch.object(ConfigHandler, 'write_recovery_conf', Mock())
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
@patch.object(Postgresql, 'query', Mock())
@patch.object(Postgresql, 'checkpoint', Mock())
@patch.object(CancellableSubprocess, 'call', Mock(return_value=0))
@patch.object(Postgresql, 'get_replica_timeline', Mock(return_value=2))
@patch.object(Postgresql, 'get_primary_timeline', Mock(return_value=2))
@patch.object(Postgresql, 'get_major_version', Mock(return_value=140000))
@patch.object(Postgresql, 'resume_wal_replay', Mock())
@patch.object(ConfigHandler, 'restore_configuration_files', Mock())
@patch.object(etcd.Client, 'write', etcd_write)
@patch.object(etcd.Client, 'read', etcd_read)
@patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException))
@patch('patroni.postgresql.polling_loop', Mock(return_value=range(1)))
@patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=False))
@patch('patroni.async_executor.AsyncExecutor.run_async', run_async)
@patch('patroni.postgresql.rewind.Thread', Mock())
@patch('patroni.postgresql.mpp.citus.CitusHandler.start', Mock())
@patch('subprocess.call', Mock(return_value=0))
@patch('time.sleep', Mock())
class TestHa(PostgresInit):
@patch('socket.getaddrinfo', socket_getaddrinfo)
@patch('patroni.dcs.dcs_modules', Mock(return_value=['patroni.dcs.etcd']))
@patch.object(etcd.Client, 'read', etcd_read)
@patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379']))
@patch.object(Config, '_load_cache', Mock())
def setUp(self):
super(TestHa, self).setUp()
self.p.set_state('running')
self.p.set_role('replica')
self.p.postmaster_start_time = MagicMock(return_value=str(postmaster_start_time))
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=False)
self.e = get_dcs({'etcd': {'ttl': 30, 'host': 'ok:2379', 'scope': 'test',
'name': 'foo', 'retry_timeout': 10},
'citus': {'database': 'citus', 'group': None}})
self.ha = Ha(MockPatroni(self.p, self.e))
self.ha.old_cluster = self.e.get_cluster()
self.ha.cluster = get_cluster_initialized_without_leader()
global_config.update(self.ha.cluster)
self.ha.load_cluster_from_dcs = Mock()
def test_update_lock(self):
self.ha.is_failsafe_mode = true
self.p.last_operation = Mock(side_effect=PostgresConnectionException(''))
self.ha.dcs.update_leader = Mock(side_effect=[DCSError(''), Exception])
self.assertRaises(DCSError, self.ha.update_lock)
self.assertFalse(self.ha.update_lock(True))
@patch.object(Postgresql, 'received_timeline', Mock(return_value=None))
def test_touch_member(self):
self.p._major_version = 110000
self.p.is_primary = false
self.p.timeline_wal_position = Mock(return_value=(0, 1, 0))
self.p.replica_cached_timeline = Mock(side_effect=Exception)
with patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value='streaming')):
self.ha.touch_member()
self.p.timeline_wal_position = Mock(return_value=(0, 1, 1))
self.p.set_role('standby_leader')
self.ha.touch_member()
self.p.set_role('primary')
self.ha.dcs.touch_member = true
self.ha.touch_member()
def test_is_leader(self):
self.assertFalse(self.ha.is_leader())
def test_start_as_replica(self):
self.p.is_healthy = false
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
@patch('patroni.dcs.etcd.Etcd.initialize', return_value=True)
def test_bootstrap_as_standby_leader(self, initialize):
self.p.data_directory_empty = true
self.ha.cluster = get_cluster_not_initialized_without_leader(
cluster_config=ClusterConfig(1, {"standby_cluster": {"port": 5432}}, 1))
self.assertEqual(self.ha.run_cycle(), 'trying to bootstrap a new standby leader')
def test_bootstrap_waiting_for_standby_leader(self):
self.p.data_directory_empty = true
self.ha.cluster = get_cluster_initialized_without_leader()
self.ha.cluster.config.data.update({'standby_cluster': {'port': 5432}})
self.assertEqual(self.ha.run_cycle(), 'waiting for standby_leader to bootstrap')
@patch.object(Cluster, 'get_clone_member',
Mock(return_value=Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni',
'conn_url': 'postgres://127.0.0.1:5432/postgres'})))
@patch.object(Bootstrap, 'create_replica', Mock(return_value=0))
def test_start_as_cascade_replica_in_standby_cluster(self):
self.p.data_directory_empty = true
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(), "trying to bootstrap from replica 'test'")
def test_recover_replica_failed(self):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID}
self.p.is_running = false
self.p.follow = false
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.assertEqual(self.ha.run_cycle(), 'failed to start postgres')
def test_recover_raft(self):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID}
self.p.is_running = false
self.p.follow = true
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.p.is_running = true
ha_dcs_orig_name = self.ha.dcs.__class__.__name__
self.ha.dcs.__class__.__name__ = 'Raft'
self.assertEqual(self.ha.run_cycle(), 'started as a secondary')
self.ha.dcs.__class__.__name__ = ha_dcs_orig_name
def test_recover_former_primary(self):
self.p.follow = false
self.p.is_running = false
self.p.name = 'leader'
self.p.set_role('demoted')
self.p.controldata = lambda: {'Database cluster state': 'shut down', 'Database system identifier': SYSID}
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'starting as readonly because i had the session lock')
def test_start_primary_after_failure(self):
self.p.start = false
self.p.is_running = false
self.p.name = 'leader'
self.p.set_role('primary')
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'starting primary after failure')
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
def test_crash_recovery(self):
self.ha.has_lock = true
self.p.is_running = false
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)), \
patch.object(Ha, 'check_timeline', Mock(return_value=False)):
self.ha._async_executor.schedule('doing crash recovery in a single user mode')
self.ha.state_handler.cancellable._process = Mock()
self.ha._crash_recovery_started -= 600
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 10})
self.assertEqual(self.ha.run_cycle(), 'terminated crash recovery because of startup timeout')
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_crash_recovery_before_rewind(self):
self.p.is_primary = false
self.p.is_running = false
self.p.controldata = lambda: {'Database cluster state': 'in archive recovery',
'Database system identifier': SYSID}
self.ha._rewind.trigger_check_diverged_lsn()
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
@patch('os.listdir', Mock(return_value=[]))
@patch('patroni.postgresql.rewind.fsync_dir', Mock())
def test_recover_with_rewind(self):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.cluster.leader.member.data.update(version='2.0.2', role='primary')
self.ha._rewind.pg_rewind = true
self.ha._rewind.check_leader_is_not_in_recovery = true
with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)):
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=False)), \
patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.p.follow = true
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.p.is_running = true
self.ha.follow = Mock(return_value='fake')
self.assertEqual(self.ha.run_cycle(), 'fake')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'should_remove_data_directory_on_diverged_timelines', PropertyMock(return_value=True))
@patch.object(Bootstrap, 'create_replica', Mock(return_value=1))
def test_recover_with_reinitialize(self):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'reinitializing due to diverged timelines')
@patch('sys.exit', return_value=1)
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
def test_sysid_no_match(self, exit_mock):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'}
self.ha.run_cycle()
exit_mock.assert_called_once_with(1)
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_start_as_readonly(self):
self.p.is_primary = false
self.p.is_healthy = true
self.ha.has_lock = true
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
@patch('patroni.psycopg.connect', psycopg_connect)
def test_acquire_lock_as_primary(self):
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
def test_leader_race_stale_primary(self):
with patch.object(Postgresql, 'get_primary_timeline', Mock(return_value=1)), \
patch('patroni.ha.logger.warning') as mock_logger:
self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node')
self.assertEqual(mock_logger.call_args[0][0], 'My timeline %s is behind last known cluster timeline %s')
def test_promoted_by_acquiring_lock(self):
self.ha.is_healthiest_node = true
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
def test_promotion_cancelled_after_pre_promote_failed(self):
self.p.is_primary = false
self.p._pre_promote = false
self.ha._is_healthiest_node = true
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(self.ha.run_cycle(), 'Promotion cancelled because the pre-promote script failed')
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
def test_lost_leader_lock_during_promote(self):
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.ha._async_executor.schedule('promote')
self.assertEqual(self.ha.run_cycle(), 'lost leader before promote')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_long_promote(self):
self.ha.has_lock = true
self.p.is_primary = false
self.p.set_role('primary')
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_demote_after_failing_to_obtain_lock(self):
self.ha.acquire_lock = false
self.assertEqual(self.ha.run_cycle(), 'demoted self after trying and failing to obtain lock')
def test_follow_new_leader_after_failing_to_obtain_lock(self):
self.ha.is_healthiest_node = true
self.ha.acquire_lock = false
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'following new leader after trying and failing to obtain lock')
def test_demote_because_not_healthiest(self):
self.ha.is_healthiest_node = false
self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node')
def test_follow_new_leader_because_not_healthiest(self):
self.ha.is_healthiest_node = false
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_promote_because_have_lock(self):
self.ha.has_lock = true
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
def test_promote_without_watchdog(self):
self.ha.has_lock = true
self.p.is_primary = true
with patch.object(Watchdog, 'activate', Mock(return_value=False)):
self.assertEqual(self.ha.run_cycle(), 'Demoting self because watchdog could not be activated')
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'Not promoting self because watchdog could not be activated')
def test_leader_with_lock(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_coordinator_leader_with_lock(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch.object(Postgresql, '_wait_for_connection_close', Mock())
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_demote_because_not_having_lock(self):
with patch.object(Watchdog, 'is_running', PropertyMock(return_value=True)):
self.assertEqual(self.ha.run_cycle(), 'demoting self because I do not have the lock and I was a leader')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_demote_because_update_lock_failed(self):
self.ha.has_lock = true
self.ha.update_lock = false
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
with patch.object(Ha, '_get_node_to_follow', Mock(side_effect=DCSError('foo'))):
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS')
def test_get_node_to_follow_nostream(self):
self.ha.patroni.nostream = True
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha._get_node_to_follow(self.ha.cluster), None)
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_follow(self):
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
self.ha.patroni.replicatefrom = "foo"
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
self.ha.cluster.config.data.update({'slots': {'l': {'database': 'a', 'plugin': 'b'}}})
self.ha.cluster.members[1].data['tags']['replicatefrom'] = 'postgresql0'
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
del self.ha.cluster.config.data['slots']
self.ha.cluster.config.data.update({'postgresql': {'use_slots': False}})
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
del self.ha.cluster.config.data['postgresql']['use_slots']
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_follow_in_pause(self):
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_follow_triggers_rewind(self):
self.p.is_primary = false
self.ha._rewind.trigger_check_diverged_lsn()
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
def test_no_dcs_connection_primary_demote(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
self.ha._async_executor.schedule('dummy')
self.assertEqual(self.ha.run_cycle(), 'demoted self because DCS is not accessible and I was a leader')
def test_check_failsafe_topology(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
global_config.update(self.ha.cluster)
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertFalse(self.ha.failsafe_is_active())
self.assertEqual(self.ha.run_cycle(),
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
self.assertTrue(self.ha.failsafe_is_active())
with patch.object(Postgresql, 'slots', Mock(side_effect=Exception)):
self.ha.patroni.request = Mock(side_effect=Exception)
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
self.assertFalse(self.ha.failsafe_is_active())
self.ha.dcs._last_failsafe.clear()
self.ha.dcs._last_failsafe[self.ha.cluster.leader.name] = self.ha.cluster.leader.member.api_url
self.assertEqual(self.ha.run_cycle(),
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
def test_no_dcs_connection_primary_failsafe(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
for m in self.ha.cluster.members:
if m.name != self.ha.cluster.leader.name:
m.data['tags']['replicatefrom'] = 'test'
global_config.update(self.ha.cluster)
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertEqual(self.ha.run_cycle(),
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
def test_readonly_dcs_primary_failsafe(self):
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
self.ha.dcs.update_leader = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertEqual(self.ha.run_cycle(),
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
def test_no_dcs_connection_replica_failsafe(self):
self.p.last_operation = Mock(side_effect=PostgresConnectionException(''))
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
global_config.update(self.ha.cluster)
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
self.p.is_primary = false
with patch('patroni.ha.logger.debug') as mock_logger:
self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible')
self.assertEqual(mock_logger.call_args_list[0][0][0], 'Failed to fetch current wal lsn: %r')
def test_no_dcs_connection_replica_failsafe_not_enabled_but_active(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible')
def test_update_failsafe(self):
self.assertRaises(Exception, self.ha.update_failsafe, {})
self.p.set_role('primary')
self.assertEqual(self.ha.update_failsafe({}), 'Running as a leader')
def test_call_failsafe_member(self):
member = Member(0, 'test', 1, {'api_url': 'http://localhost:8011/patroni'})
self.ha.patroni.request = Mock()
self.ha.patroni.request.return_value.data = b'Accepted'
self.ha.patroni.request.return_value.status = 200
with patch('patroni.ha.logger.info') as mock_logger:
ret = self.ha.call_failsafe_member({}, member)
self.assertEqual(mock_logger.call_args_list[0][0],
('Got response from %s %s: %s', 'test', 'http://localhost:8011/failsafe', 'Accepted'))
self.assertTrue(ret.accepted)
e = Exception('request failed')
self.ha.patroni.request.side_effect = e
with patch('patroni.ha.logger.warning') as mock_logger:
ret = self.ha.call_failsafe_member({}, member)
self.assertEqual(mock_logger.call_args_list[0][0],
('Request failed to %s: POST %s (%s)', 'test', 'http://localhost:8011/failsafe', e))
self.assertFalse(ret.accepted)
@patch('time.sleep', Mock())
def test_bootstrap_from_another_member(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap from replica \'other\'')
def test_bootstrap_waiting_for_leader(self):
self.ha.cluster = get_cluster_initialized_without_leader()
self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap')
def test_bootstrap_without_leader(self):
self.ha.cluster = get_cluster_initialized_without_leader()
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True)
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap (without leader)')
def test_bootstrap_not_running_concurrently(self):
self.ha.cluster = get_cluster_bootstrapping_without_leader()
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True)
self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap')
def test_bootstrap_initialize_lock_failed(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.assertEqual(self.ha.bootstrap(), 'failed to acquire initialize lock')
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('patroni.postgresql.mpp.citus.connect', psycopg_connect)
@patch('patroni.postgresql.mpp.citus.quote_ident', Mock())
@patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_initialized_new_cluster(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap a new cluster')
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(), 'waiting for end of recovery after bootstrap')
self.p.is_primary = true
self.ha.is_synchronous_mode = true
self.assertEqual(self.ha.run_cycle(), 'running post_bootstrap')
self.assertEqual(self.ha.run_cycle(), 'initialized a new cluster')
def test_bootstrap_release_initialize_key_on_failure(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.ha.bootstrap()
self.p.is_running = false
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('patroni.postgresql.mpp.citus.connect', psycopg_connect)
@patch('patroni.postgresql.mpp.citus.quote_ident', Mock())
@patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_release_initialize_key_on_watchdog_failure(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.ha.bootstrap()
self.p.is_primary = true
with patch.object(Watchdog, 'activate', Mock(return_value=False)), \
patch('patroni.ha.logger.error') as mock_logger:
self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap')
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
self.assertTrue(mock_logger.call_args[0][0].startswith('Cancelling bootstrap because'
' watchdog activation failed'))
@patch('patroni.psycopg.connect', psycopg_connect)
def test_reinitialize(self):
self.assertIsNotNone(self.ha.reinitialize())
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertIsNone(self.ha.reinitialize(True))
self.ha._async_executor.schedule('reinitialize')
self.assertIsNotNone(self.ha.reinitialize())
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertIsNotNone(self.ha.reinitialize())
@patch('time.sleep', Mock())
def test_restart(self):
self.assertEqual(self.ha.restart({}), (True, 'restarted successfully'))
self.p.restart = Mock(return_value=None)
self.assertEqual(self.ha.restart({}), (False, 'postgres is still starting'))
self.p.restart = false
self.assertEqual(self.ha.restart({}), (False, 'restart failed'))
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha._async_executor.schedule('reinitialize')
self.assertEqual(self.ha.restart({}), (False, 'reinitialize already in progress'))
with patch.object(self.ha, "restart_matches", return_value=False):
self.assertEqual(self.ha.restart({'foo': 'bar'}), (False, "restart conditions are not satisfied"))
@patch('time.sleep', Mock())
@patch.object(ConfigHandler, 'replace_pg_hba', Mock())
@patch.object(ConfigHandler, 'replace_pg_ident', Mock())
@patch.object(PostmasterProcess, 'start', Mock(return_value=MockPostmaster()))
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_worker_restart(self):
self.ha.has_lock = true
self.ha.patroni.request = Mock()
self.p.is_running = Mock(side_effect=[Mock(), False])
self.assertEqual(self.ha.restart({}), (True, 'restarted successfully'))
self.ha.patroni.request.assert_called()
self.assertEqual(self.ha.patroni.request.call_args_list[0][0][3]['type'], 'before_demote')
self.assertEqual(self.ha.patroni.request.call_args_list[1][0][3]['type'], 'after_promote')
@patch('os.kill', Mock())
def test_restart_in_progress(self):
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.ha._async_executor.schedule('restart')
self.assertTrue(self.ha.restart_scheduled())
self.assertEqual(self.ha.run_cycle(), 'restart in progress')
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'restart in progress')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'updated leader lock during restart')
self.ha.update_lock = false
self.p.set_role('primary')
with patch('patroni.async_executor.CriticalTask.cancel', Mock(return_value=False)), \
patch('patroni.async_executor.CriticalTask.result',
PropertyMock(return_value=PostmasterProcess(os.getpid())), create=True), \
patch('patroni.postgresql.Postgresql.terminate_starting_postmaster') as mock_terminate:
self.assertEqual(self.ha.run_cycle(), 'lost leader lock during restart')
mock_terminate.assert_called()
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: restart in progress')
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_manual_failover_from_leader(self):
self.ha.has_lock = true # I am the leader
# to me
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
mock_warning.assert_called_with('%s: I am already the leader, no need to %s', 'manual failover', 'failover')
# to a non-existent candidate
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'blabla', None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
mock_warning.assert_called_with(
'%s: no healthy members found, %s is not possible', 'manual failover', 'failover')
# to an existent candidate
self.ha.fetch_node_status = get_node_status()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'b', None))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
# to a candidate on an older timeline
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0],
('Timeline %s of member %s is behind the cluster timeline %s', 1, 'b', 2))
# to a lagging candidate
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0],
('Member %s exceeds maximum replication lag', 'b'))
self.ha.cluster.members.pop()
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_manual_switchover_from_leader(self):
self.ha.has_lock = true # I am the leader
self.ha.fetch_node_status = get_node_status()
# different leader specified in failover key, no candidate
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', '', None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
mock_warning.assert_called_with(
'%s: leader name does not match: %s != %s', 'switchover', 'blabla', 'postgresql0')
# no candidate
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None))
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
self.ha._rewind.rewind_or_reinitialize_needed_and_possible = true
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
# other members with failover_limitation_s
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not allowed to promote'))
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(watchdog_failed=True)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not watchdog capable'))
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0],
('Timeline %s of member %s is behind the cluster timeline %s', 1, 'leader', 2))
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s exceeds maximum replication lag', 'leader'))
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_scheduled_switchover_from_leader(self):
self.ha.has_lock = true # I am the leader
self.ha.fetch_node_status = get_node_status()
# switchover scheduled time must include timezone
with patch('patroni.ha.logger.warning') as mock_warning:
scheduled = datetime.datetime.now()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'blabla', scheduled))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertIn('Incorrect value of scheduled_at: %s', mock_warning.call_args_list[0][0])
# scheduled now
scheduled = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=tzutc)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'b', scheduled))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('switchover: demoting myself', self.ha.run_cycle())
# scheduled in the future
with patch('patroni.ha.logger.info') as mock_info:
scheduled = scheduled + datetime.timedelta(seconds=30)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'blabla', scheduled))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.assertIn('Awaiting %s at %s (in %.0f seconds)', mock_info.call_args_list[0][0])
# stale value
with patch('patroni.ha.logger.warning') as mock_warning:
scheduled = scheduled + datetime.timedelta(seconds=-600)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'b', scheduled))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.assertIn('Found a stale %s value, cleaning up: %s', mock_warning.call_args_list[0][0])
def test_manual_switchover_from_leader_in_pause(self):
self.ha.has_lock = true # I am the leader
self.ha.is_paused = true
# no candidate
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None))
with patch('patroni.ha.logger.warning') as mock_warning:
self.assertEqual('PAUSE: no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
mock_warning.assert_called_with(
'%s is possible only to a specific candidate in a paused state', 'Switchover')
def test_manual_failover_from_leader_in_pause(self):
self.ha.has_lock = true
self.ha.fetch_node_status = get_node_status()
self.ha.is_paused = true
# failover from me, candidate is healthy
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, None, 'b', None))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('PAUSE: manual failover: demoting myself', self.ha.run_cycle())
self.ha.cluster.members.pop()
def test_manual_failover_from_leader_in_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.ha.process_sync_replication = Mock()
self.ha.fetch_node_status = get_node_status()
# I am the leader
self.p.is_primary = true
self.ha.has_lock = true
# the candidate is not in sync members but we allow failover to an async candidate
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, None, 'b', None), sync=(self.p.name, 'a'))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('manual failover: demoting myself', self.ha.run_cycle())
self.ha.cluster.members.pop()
def test_manual_switchover_from_leader_in_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.ha.process_sync_replication = Mock()
# I am the leader
self.p.is_primary = true
self.ha.has_lock = true
# candidate specified is not in sync members
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None),
sync=(self.p.name, 'blabla'))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.assertEqual(mock_warning.call_args_list[0][0],
('%s candidate=%s does not match with sync_standbys=%s', 'Switchover', 'a', 'blabla'))
# the candidate is in sync members and is healthy
self.ha.fetch_node_status = get_node_status(wal_position=305419896)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None),
sync=(self.p.name, 'a'))
self.ha.cluster.members.append(Member(0, 'a', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.assertEqual('switchover: demoting myself', self.ha.run_cycle())
# the candidate is in sync members but is not healthy
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(nofailover=true)
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'a', 'not allowed to promote'))
def test_manual_failover_process_no_leader(self):
self.p.is_primary = false
self.p.set_role('replica')
# failover to another member, fetch_node_status for candidate fails
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'leader', None))
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_warning.call_args_list[1][0],
('%s: member %s is %s', 'manual failover', 'leader', 'not reachable'))
# failover to another member, candidate is accessible, in_recovery
self.p.set_role('replica')
self.ha.fetch_node_status = get_node_status()
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# set nofailover flag to True for all members of the cluster
# this should elect the current member, as we are not going to call the API for it.
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
# failover to me but I am set to nofailover. In no case I should be elected as a leader
self.p.set_role('replica')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None))
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
self.ha.patroni.nofailover = False
# failover to another member that is on an older timeline (only failover_limitation() is checked)
with patch('patroni.ha.logger.info') as mock_info:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'b', None))
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
mock_info.assert_called_with('%s: to %s, i am %s', 'manual failover', 'b', 'postgresql0')
# failover to another member lagging behind the cluster_lsn (only failover_limitation() is checked)
with patch('patroni.ha.logger.info') as mock_info:
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
mock_info.assert_called_with('%s: to %s, i am %s', 'manual failover', 'b', 'postgresql0')
def test_manual_switchover_process_no_leader(self):
self.p.is_primary = false
self.p.set_role('replica')
# I was the leader, other members are healthy
self.ha.fetch_node_status = get_node_status()
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, self.p.name, '', None))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# I was the leader, I am the only healthy member
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not reachable'))
self.assertEqual(mock_info.call_args_list[1][0], ('Member %s is %s', 'other', 'not reachable'))
def test_manual_failover_process_no_leader_in_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.p.is_primary = false
self.ha.fetch_node_status = get_node_status(nofailover=True) # other nodes are not healthy
# manual failover when our name (postgresql0) isn't in the /sync key and the candidate node is not available
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None),
sync=('leader1', 'blabla'))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# manual failover when the candidate node isn't available but our name is in the /sync key
# while other sync node is nofailover
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None),
sync=('leader1', 'postgresql0'))
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_warning.call_args_list[0][0],
('%s: member %s is %s', 'manual failover', 'other', 'not allowed to promote'))
# manual failover to our node (postgresql0),
# which name is not in sync nodes list (some sync nodes are available)
self.p.set_role('replica')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None),
sync=('leader1', 'other'))
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['leader1']),
CaseInsensitiveSet(['leader1'])))
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
def test_manual_switchover_process_no_leader_in_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.p.is_primary = false
# to a specific node, which name doesn't match our name (postgresql0)
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'other', None))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# to our node (postgresql0), which name is not in sync nodes list
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'postgresql0', None),
sync=('leader1', 'blabla'))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# without candidate, our name (postgresql0) is not in the sync nodes list
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None),
sync=('leader', 'blabla'))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
# switchover from a specific leader, but the only sync node (us, postgresql0) has nofailover tag
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None),
sync=('postgresql0'))
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
def test_manual_failover_process_no_leader_in_pause(self):
self.ha.is_paused = true
# I am running as primary, cluster is unlocked, the candidate is allowed to promote
# but we are in pause
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
def test_manual_switchover_process_no_leader_in_pause(self):
self.ha.is_paused = true
# I am running as primary, cluster is unlocked, no candidate specified
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
# the candidate is not running
with patch('patroni.ha.logger.warning') as mock_warning:
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'blabla', None))
self.assertEqual('PAUSE: acquired session lock as a leader', self.ha.run_cycle())
self.assertEqual(
mock_warning.call_args_list[0][0],
('%s: removing failover key because failover candidate is not running', 'switchover'))
# switchover to me, I am not leader
self.p.is_primary = false
self.p.set_role('replica')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: promoted self to leader by acquiring session lock')
def test_is_healthiest_node(self):
self.ha.is_failsafe_mode = true
self.ha.state_handler.is_primary = false
self.ha.patroni.nofailover = False
self.ha.fetch_node_status = get_node_status()
self.ha.dcs._last_failsafe = {'foo': ''}
self.assertFalse(self.ha.is_healthiest_node())
self.ha.dcs._last_failsafe = {'postgresql0': ''}
self.assertTrue(self.ha.is_healthiest_node())
self.ha.dcs._last_failsafe = None
with patch.object(Watchdog, 'is_healthy', PropertyMock(return_value=False)):
self.assertFalse(self.ha.is_healthiest_node())
self.ha.is_paused = true
self.assertFalse(self.ha.is_healthiest_node())
def test__is_healthiest_node(self):
self.p.is_primary = false
self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name))
global_config.update(self.ha.cluster)
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status(in_recovery=False) # accessible, not in_recovery
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status(failover_priority=2) # accessible, in_recovery, higher priority
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
# if there is a higher-priority node but it has a lower WAL position then this node should race
self.ha.fetch_node_status = get_node_status(failover_priority=6, wal_position=9)
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
# if the old leader is a higher-priority node on the same WAL position then this node should race
self.ha.fetch_node_status = get_node_status(failover_priority=6)
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members, leader=self.ha.old_cluster.leader))
self.ha.fetch_node_status = get_node_status(wal_position=11) # accessible, in_recovery, wal position ahead
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
# in synchronous_mode consider itself healthy if the former leader is accessible in read-only and ahead of us
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
global_config.update(self.ha.cluster)
with patch('patroni.postgresql.Postgresql.last_operation', return_value=1):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=None):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=1):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.patroni.nofailover = True
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.patroni.nofailover = None
self.ha.patroni.failover_priority = 0
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
def test_fetch_node_status(self):
member = Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni'})
self.ha.fetch_node_status(member)
member = Member(0, 'test', 1, {'api_url': 'http://localhost:8011/patroni'})
self.ha.patroni.request = Mock()
self.ha.patroni.request.return_value.data = b'{"wal":{"location":1},"role":"primary"}'
ret = self.ha.fetch_node_status(member)
self.assertFalse(ret.in_recovery)
@patch.object(Rewind, 'pg_rewind', true)
@patch.object(Rewind, 'check_leader_is_not_in_recovery', true)
@patch('os.listdir', Mock(return_value=[]))
@patch('patroni.postgresql.rewind.fsync_dir', Mock())
@patch.object(Postgresql, 'call_nowait')
def test_post_recover(self, mock_call_nowait):
self.p.is_running = false
self.ha.has_lock = true
self.p.set_role('primary')
self.assertEqual(self.ha.post_recover(), 'removed leader key after trying and failing to start postgres')
self.assertEqual(self.p.role, 'demoted')
mock_call_nowait.assert_called_once_with(CallbackAction.ON_ROLE_CHANGE)
self.ha.has_lock = false
self.assertEqual(self.ha.post_recover(), 'failed to start postgres')
leader = Leader(0, 0, Member(0, 'l', 2, {"version": "1.6", "conn_url": "postgres://a", "role": "primary"}))
self.ha._rewind.execute(leader)
self.p.is_running = true
self.assertIsNone(self.ha.post_recover())
def test_schedule_future_restart(self):
self.ha.patroni.scheduled_restart = {}
# do the restart 2 times. The first one should succeed, the second one should fail
self.assertTrue(self.ha.schedule_future_restart({'schedule': future_restart_time}))
self.assertFalse(self.ha.schedule_future_restart({'schedule': future_restart_time}))
def test_delete_future_restarts(self):
self.ha.delete_future_restart()
def test_evaluate_scheduled_restart(self):
self.p.postmaster_start_time = Mock(return_value=str(postmaster_start_time))
# restart already in progress
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.assertIsNone(self.ha.evaluate_scheduled_restart())
# restart while the postmaster has been already restarted, fails
with patch.object(self.ha,
'future_restart_scheduled',
Mock(return_value={'postmaster_start_time':
str(postmaster_start_time - datetime.timedelta(days=1)),
'schedule': str(future_restart_time)})):
self.assertIsNone(self.ha.evaluate_scheduled_restart())
with patch.object(self.ha,
'future_restart_scheduled',
Mock(return_value={'postmaster_start_time': str(postmaster_start_time),
'schedule': str(future_restart_time)})):
with patch.object(self.ha,
'should_run_scheduled_action', Mock(return_value=True)):
# restart in the future, ok
self.assertIsNotNone(self.ha.evaluate_scheduled_restart())
with patch.object(self.ha, 'restart', Mock(return_value=(False, "Test"))):
# restart in the future, bit the actual restart failed
self.assertIsNone(self.ha.evaluate_scheduled_restart())
def test_scheduled_restart(self):
self.ha.cluster = get_cluster_initialized_with_leader()
with patch.object(self.ha, "evaluate_scheduled_restart", Mock(return_value="restart scheduled")):
self.assertEqual(self.ha.run_cycle(), "restart scheduled")
def test_restart_matches(self):
self.p._role = 'replica'
self.p._connection.server_version = 90500
self.p._pending_restart = True
self.assertFalse(self.ha.restart_matches("primary", "9.5.0", True))
self.assertFalse(self.ha.restart_matches("replica", "9.4.3", True))
self.p._pending_restart = False
self.assertFalse(self.ha.restart_matches("replica", "9.5.2", True))
self.assertTrue(self.ha.restart_matches("replica", "9.5.2", False))
def test_process_healthy_cluster_in_pause(self):
self.p.is_primary = false
self.ha.is_paused = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running as primary')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: waiting to become primary after promote...')
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch('builtins.open', mock_open(read_data='1\t0/40159C0\tno recovery target specified\n'))
def test_process_healthy_standby_cluster_as_standby_leader(self):
self.p.is_primary = false
self.p.name = 'leader'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.p.config.check_recovery_conf = Mock(return_value=(False, False))
self.ha._leader_timeline = 1
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
self.assertEqual(self.ha.run_cycle(), 'no action. I am (leader), the standby leader with the lock')
self.p.set_role('replica')
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
def test_process_healthy_standby_cluster_as_cascade_replica(self):
self.p.is_primary = false
self.p.name = 'replica'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(),
'no action. I am (replica), a secondary, and following a standby leader (leader)')
with patch.object(Leader, 'conn_url', PropertyMock(return_value='')):
self.assertEqual(self.ha.run_cycle(), 'continue following the old known standby leader')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=True))
def test_process_unhealthy_standby_cluster_as_standby_leader(self):
self.p.is_primary = false
self.p.name = 'leader'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.ha.sysid_valid = true
self.p._sysid = True
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader by acquiring session lock')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_process_unhealthy_standby_cluster_as_cascade_replica(self):
self.p.is_primary = false
self.p.name = 'replica'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertTrue(self.ha.run_cycle().startswith('running pg_rewind from remote_member:'))
def test_recover_unhealthy_leader_in_standby_cluster(self):
self.p.is_primary = false
self.p.name = 'leader'
self.p.is_running = false
self.p.follow = false
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(), 'starting as a standby leader because i had the session lock')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=True))
def test_recover_unhealthy_unlocked_standby_cluster(self):
self.p.is_primary = false
self.p.name = 'leader'
self.p.is_running = false
self.p.follow = false
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.ha.has_lock = false
self.assertEqual(self.ha.run_cycle(), 'trying to follow a remote member because standby cluster is unhealthy')
def test_failed_to_update_lock_in_pause(self):
self.ha.update_lock = false
self.ha.is_paused = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(),
'PAUSE: continue to run as primary after failing to update leader lock in DCS')
def test_postgres_unhealthy_in_pause(self):
self.ha.is_paused = true
self.p.is_healthy = false
self.assertEqual(self.ha.run_cycle(), 'PAUSE: postgres is not running')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running')
def test_no_etcd_connection_in_pause(self):
self.ha.is_paused = true
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: DCS is not accessible')
@patch('patroni.ha.Ha.update_lock', return_value=True)
@patch('patroni.ha.Ha.demote')
def test_starting_timeout(self, demote, update_lock):
def check_calls(seq):
for mock, called in seq:
if called:
mock.assert_called_once()
else:
mock.assert_not_called()
mock.reset_mock()
self.ha.has_lock = true
self.ha.cluster = get_cluster_initialized_with_leader()
self.p.check_for_startup = true
self.p.time_in_state = lambda: 30
self.assertEqual(self.ha.run_cycle(), 'PostgreSQL is still starting up, 270 seconds until timeout')
check_calls([(update_lock, True), (demote, False)])
self.p.time_in_state = lambda: 350
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
self.assertEqual(self.ha.run_cycle(),
'primary start has timed out, but continuing to wait because failover is not possible')
check_calls([(update_lock, True), (demote, False)])
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL because of startup timeout')
check_calls([(update_lock, True), (demote, True)])
update_lock.return_value = False
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL while starting up because leader key was lost')
check_calls([(update_lock, True), (demote, True)])
self.ha.has_lock = false
self.p.is_primary = false
self.assertEqual(self.ha.run_cycle(),
'no action. I am (postgresql0), a secondary, and following a leader (leader)')
check_calls([(update_lock, False), (demote, False)])
def test_manual_failover_while_starting(self):
self.ha.has_lock = true
self.p.check_for_startup = true
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
@patch('patroni.ha.Ha.demote')
def test_failover_immediately_on_zero_primary_start_timeout(self, demote):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader(sync=(self.p.name, 'other'))
self.ha.cluster.config.data.update({'synchronous_mode': True, 'primary_start_timeout': 0})
self.ha.has_lock = true
self.ha.update_lock = true
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL to fail over after a crash')
demote.assert_called_once()
def test_primary_stop_timeout(self):
self.assertEqual(self.ha.primary_stop_timeout(), None)
self.ha.cluster.config.data.update({'primary_stop_timeout': 30})
global_config.update(self.ha.cluster)
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.assertEqual(self.ha.primary_stop_timeout(), 30)
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=False)):
self.assertEqual(self.ha.primary_stop_timeout(), None)
self.ha.cluster.config.data['primary_stop_timeout'] = None
global_config.update(self.ha.cluster)
self.assertEqual(self.ha.primary_stop_timeout(), None)
@patch('patroni.postgresql.Postgresql.follow')
def test_demote_immediate(self, follow):
self.ha.has_lock = true
self.e.get_cluster = Mock(return_value=get_cluster_initialized_without_leader())
self.ha.demote('immediate')
follow.assert_called_once_with(None)
def test__process_multisync_replication(self):
self.ha.has_lock = true
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
mock_cfg_set_sync = self.p.config.set_synchronous_standby_names = Mock()
self.p.name = 'leader'
# Test sync key removed when sync mode disabled
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
self.ha.run_cycle()
mock_delete_sync.assert_called_once()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet())
mock_cfg_set_sync.assert_called_once()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test sync key not touched when not there
self.ha.cluster = get_cluster_initialized_with_leader()
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
self.ha.run_cycle()
mock_delete_sync.assert_not_called()
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_called_once()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
self.ha.is_synchronous_mode = true
# Test sync standby not touched when picking the same node
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test sync standby is replaced when switching standbys
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet(['other2'])))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(['other2']))
mock_cfg_set_sync.assert_not_called()
# Test sync standby is replaced when new standby is joined
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2', 'other3'])))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.run_cycle()
self.assertEqual(mock_set_sync.call_args_list[0][0], (CaseInsensitiveSet(['other2']),))
self.assertEqual(mock_set_sync.call_args_list[1][0], (CaseInsensitiveSet(['other2', 'other3']),))
mock_cfg_set_sync.assert_not_called()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test sync standby is not disabled when updating dcs fails
self.ha.dcs.write_sync_state = Mock(return_value=None)
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_not_called()
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
# Test changing sync standby
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other')))
# self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2'])))
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test updating sync standby key failed due to race
self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState.empty(), None])
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test updating sync standby key failed due to DCS being not accessible
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(side_effect=DCSError('foo'))
self.ha.run_cycle()
# Test changing sync standby failed due to race
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('somebodyelse', None)))
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test sync set to '*' when synchronous_mode_strict is enabled
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet()))
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
mock_cfg_set_sync.assert_not_called()
# Test the value configured by the user for synchronous_standby_names is used when synchronous mode is disabled
self.ha.is_synchronous_mode = false
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
ssn_mock = PropertyMock(return_value="SOME_SSN")
with patch('patroni.postgresql.config.ConfigHandler.synchronous_standby_names', ssn_mock):
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_cfg_set_sync.assert_called_once_with("SOME_SSN")
def test_sync_replication_become_primary(self):
self.ha.is_synchronous_mode = true
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
self.p.is_primary = false
self.p.set_role('replica')
self.ha.has_lock = true
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader(sync=('other', None))
# When we just became primary nobody is sync
self.assertEqual(self.ha.enforce_primary_role('msg', 'promote msg'), 'promote msg')
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(), 0)
mock_write_sync.assert_called_once_with('leader', None, 0, version=0)
mock_set_sync.reset_mock()
# When we just became primary nobody is sync
self.p.set_role('replica')
mock_write_sync.return_value = False
self.assertTrue(self.ha.enforce_primary_role('msg', 'promote msg') != 'promote msg')
mock_set_sync.assert_not_called()
def test_unhealthy_sync_mode(self):
self.ha.is_synchronous_mode = true
self.p.is_primary = false
self.p.set_role('replica')
self.p.name = 'other'
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other2'))
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
mock_acquire = self.ha.acquire_lock = Mock(return_value=True)
mock_follow = self.p.follow = Mock()
mock_promote = self.p.promote = Mock()
# If we don't match the sync replica we are not allowed to acquire lock
self.ha.run_cycle()
mock_acquire.assert_not_called()
mock_follow.assert_called_once()
self.assertEqual(mock_follow.call_args[0][0], None)
mock_write_sync.assert_not_called()
mock_follow.reset_mock()
# If we do match we will try to promote
self.ha._is_healthiest_node = true
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other'))
self.ha.run_cycle()
mock_acquire.assert_called_once()
mock_follow.assert_not_called()
mock_promote.assert_called_once()
mock_write_sync.assert_called_once_with('other', None, 0, version=0)
def test_disable_sync_when_restarting(self):
self.ha.is_synchronous_mode = true
self.p.name = 'other'
self.p.is_primary = false
self.p.set_role('replica')
mock_restart = self.p.restart = Mock(return_value=True)
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.ha.touch_member = Mock(return_value=True)
self.ha.dcs.get_cluster = Mock(side_effect=[
get_cluster_initialized_with_leader(sync=('leader', syncstandby))
for syncstandby in ['other', None]])
with patch('time.sleep') as mock_sleep:
self.ha.restart({})
mock_restart.assert_called_once()
mock_sleep.assert_called()
# Restart is still called when DCS connection fails
mock_restart.reset_mock()
self.ha.dcs.get_cluster = Mock(side_effect=DCSError("foo"))
self.ha.restart({})
mock_restart.assert_called_once()
# We don't try to fetch the cluster state when touch_member fails
mock_restart.reset_mock()
self.ha.dcs.get_cluster.reset_mock()
self.ha.touch_member = Mock(return_value=False)
self.ha.restart({})
mock_restart.assert_called_once()
self.ha.dcs.get_cluster.assert_not_called()
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_enable_synchronous_mode(self):
self.ha.is_synchronous_mode = true
self.ha.has_lock = true
self.p.name = 'leader'
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
with patch('patroni.ha.logger.info') as mock_logger:
self.ha.run_cycle()
self.assertEqual(mock_logger.call_args_list[0][0][0], 'Enabled synchronous replication')
self.ha.dcs.write_sync_state = Mock(return_value=None)
with patch('patroni.ha.logger.warning') as mock_logger:
self.ha.run_cycle()
self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed')
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_inconsistent_synchronous_state(self):
self.ha.is_synchronous_mode = true
self.ha.has_lock = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a'))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
CaseInsensitiveSet(), CaseInsensitiveSet('a')))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
with patch('patroni.ha.logger.warning') as mock_logger:
self.ha.run_cycle()
mock_set_sync.assert_called_once()
self.assertTrue(mock_logger.call_args_list[0][0][0].startswith('Inconsistent state between '))
self.ha.dcs.write_sync_state = Mock(return_value=None)
with patch('patroni.ha.logger.warning') as mock_logger:
self.ha.run_cycle()
self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed')
def test_effective_tags(self):
self.ha._disable_sync = True
self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar', 'nosync': True, 'sync_priority': 0})
self.ha._disable_sync = False
self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar'})
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch('builtins.open', Mock(side_effect=Exception))
def test_restore_cluster_config(self):
self.ha.cluster.config.data.clear()
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_watch(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.watch(0)
def test_wakeup(self):
self.ha.wakeup()
def test_shutdown(self):
self.p.is_running = false
self.ha.is_leader = true
def stop(*args, **kwargs):
kwargs['on_shutdown'](123, 120)
self.p.stop = stop
self.ha.shutdown()
self.ha.is_failover_possible = true
self.ha.shutdown()
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_shutdown_citus_worker(self):
self.ha.is_leader = true
self.p.is_running = Mock(side_effect=[Mock(), False])
self.ha.patroni.request = Mock()
self.ha.shutdown()
self.ha.patroni.request.assert_called()
self.assertEqual(self.ha.patroni.request.call_args[0][2], 'citus')
self.assertEqual(self.ha.patroni.request.call_args[0][3]['type'], 'before_demote')
@patch('time.sleep', Mock())
def test_leader_with_not_accessible_data_directory(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.has_lock = true
self.p.data_directory_empty = Mock(side_effect=OSError(5, "Input/output error: '{}'".format(self.p.data_dir)))
self.assertEqual(self.ha.run_cycle(),
'released leader key voluntarily as data dir not accessible and currently leader')
self.assertEqual(self.p.role, 'uninitialized')
# as has_lock is mocked out, we need to fake the leader key release
self.ha.has_lock = false
# will not say bootstrap because data directory is not accessible
self.assertEqual(self.ha.run_cycle(),
"data directory is not accessible: [Errno 5] Input/output error: '{}'".format(self.p.data_dir))
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch('builtins.open', mock_open(read_data=('1\t0/40159C0\tno recovery target specified\n\n'
'2\t1/40159C0\tno recovery target specified\n')))
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_update_cluster_history(self):
self.ha.has_lock = true
for tl in (1, 3):
self.p.get_primary_timeline = Mock(return_value=tl)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch('sys.exit', return_value=1)
def test_abort_join(self, exit_mock):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.p.is_primary = false
self.ha.run_cycle()
exit_mock.assert_called_once_with(1)
self.p.set_role('replica')
self.ha.dcs.initialize = Mock()
with patch.object(Postgresql, 'cb_called', PropertyMock(return_value=True)):
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.ha.dcs.initialize.assert_not_called()
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_after_pause(self):
self.ha.has_lock = true
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0), the leader with the lock')
self.ha.is_paused = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch('patroni.psycopg.connect', psycopg_connect)
def test_permanent_logical_slots_after_promote(self):
self.p._major_version = 110000
config = ClusterConfig(1, {'slots': {'l': {'database': 'postgres', 'plugin': 'test_decoding'}}}, 1)
self.p.name = 'other'
self.ha.cluster = get_cluster_initialized_without_leader(cluster_config=config)
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
self.ha.cluster = get_cluster_initialized_without_leader(leader=True, cluster_config=config)
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (other), the leader with the lock')
@patch.object(Cluster, 'has_member', true)
def test_run_cycle(self):
self.ha.dcs.touch_member = Mock(side_effect=DCSError('foo'))
self.assertEqual(self.ha.run_cycle(), 'Unexpected exception raised, please report it as a BUG')
self.ha.dcs.touch_member = Mock(side_effect=PatroniFatalException('foo'))
self.assertRaises(PatroniFatalException, self.ha.run_cycle)
def test_empty_directory_in_pause(self):
self.ha.is_paused = true
self.p.data_directory_empty = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: running with empty data directory')
self.assertEqual(self.p.role, 'uninitialized')
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
def test_sysid_no_match_in_pause(self):
self.ha.is_paused = true
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'}
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: released leader key voluntarily due to the system ID mismatch')
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('os.path.exists', Mock(return_value=True))
@patch('shutil.rmtree', Mock())
@patch('os.makedirs', Mock())
@patch('os.open', Mock())
@patch('os.fsync', Mock())
@patch('os.close', Mock())
@patch('os.chmod', Mock())
@patch('os.rename', Mock())
@patch('patroni.postgresql.Postgresql.is_starting', Mock(return_value=False))
@patch('builtins.open', mock_open())
@patch.object(ConfigHandler, 'check_recovery_conf', Mock(return_value=(False, False)))
@patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000))
@patch.object(SlotsHandler, 'sync_replication_slots', Mock(return_value=['ls']))
def test_follow_copy(self):
self.ha.cluster.config.data['slots'] = {'ls': {'database': 'a', 'plugin': 'b'}}
self.p.is_primary = false
self.assertTrue(self.ha.run_cycle().startswith('Copying logical slots'))
def test_acquire_lock(self):
self.ha.dcs.attempt_to_acquire_leader = Mock(side_effect=[DCSError('foo'), Exception])
self.assertRaises(DCSError, self.ha.acquire_lock)
self.assertFalse(self.ha.acquire_lock())
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
def test_notify_citus_coordinator(self):
self.ha.patroni.request = Mock()
self.ha.notify_mpp_coordinator('before_demote')
self.ha.patroni.request.assert_called_once()
self.assertEqual(self.ha.patroni.request.call_args[1]['timeout'], 30)
self.ha.patroni.request = Mock(side_effect=Exception)
with patch('patroni.ha.logger.warning') as mock_logger:
self.ha.notify_mpp_coordinator('before_promote')
self.assertEqual(self.ha.patroni.request.call_args[1]['timeout'], 2)
mock_logger.assert_called()
self.assertTrue(mock_logger.call_args[0][0].startswith('Request to %s coordinator leader'))
self.assertEqual(mock_logger.call_args[0][1], 'Citus')
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
@patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True))
def test_process_sync_replication_prepromote(self):
self.p._major_version = 90500
self.ha.cluster = get_cluster_initialized_without_leader(sync=('other', self.p.name + ',foo'))
self.p.is_primary = false
self.p.set_role('replica')
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
# Postgres 9.5, write_sync_state to DCS failed
self.assertEqual(self.ha.run_cycle(),
'Postponing promotion because synchronous replication state was updated by somebody else')
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
mock_set_sync = self.p.config.set_synchronous_standby_names = Mock()
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=True)
# Postgres 9.5, our name is written to leader of the /sync key, while voters list and ssn is empty
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], (None,))
self.p._major_version = 90600
mock_set_sync.reset_mock()
mock_write_sync.reset_mock()
self.p.set_role('replica')
# Postgres 9.6, with quorum commit we avoid updating /sync key and put some nodes to ssn
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_write_sync.call_count, 0)
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('2 (foo,other)',))
self.p._major_version = 150000
mock_set_sync.reset_mock()
self.p.set_role('replica')
self.p.name = 'nonsync'
self.ha.fetch_node_status = get_node_status()
# Postgres 15, with quorum commit. Non-sync node promoted we avoid updating /sync key and put some nodes to ssn
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_write_sync.call_count, 0)
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 3 (foo,other,postgresql0)',))
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
@patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True))
def test__process_quorum_replication(self):
self.p._major_version = 150000
self.ha.has_lock = true
mock_set_sync = self.p.config.set_synchronous_standby_names = Mock()
self.p.name = 'leader'
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
# Test /sync key is attempted to set and failed when missing or invalid
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 1, CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None})
self.assertEqual(mock_set_sync.call_count, 0)
self.ha._promote_timestamp = 1
mock_write_sync = self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState(None, self.p.name, None, 0), None])
# Test /sync key is attempted to set and succeed when missing or invalid
with patch.object(SyncState, 'is_empty', Mock(side_effect=[True, False])):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 2)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None})
self.assertEqual(mock_write_sync.call_args_list[1][0], (self.p.name, CaseInsensitiveSet(['other']), 0))
self.assertEqual(mock_write_sync.call_args_list[1][1], {'version': None})
self.assertEqual(mock_set_sync.call_count, 0)
self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, 0, CaseInsensitiveSet(['foo']),
CaseInsensitiveSet(['other'])),
_SyncState('quorum', 1, 1, CaseInsensitiveSet(['foo']),
CaseInsensitiveSet(['foo']))])
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState(1, 'leader', 'foo', 0))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo'))
# Test the sync node is removed from voters, added to ssn
with patch.object(Postgresql, 'synchronous_standby_names', Mock(return_value='other')), \
patch('time.sleep', Mock()):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (other)',))
# Test ANY 1 (*) when synchronous_mode_strict and no nodes available
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 0,
CaseInsensitiveSet(['other', 'foo']),
CaseInsensitiveSet()))
mock_write_sync.reset_mock()
mock_set_sync.reset_mock()
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (*)',))
# Test that _process_quorum_replication doesn't take longer than loop_wait
with patch('time.time', Mock(side_effect=[30, 60, 90, 120])):
self.ha.process_sync_replication()