Files
patroni/tests/test_ha.py
2022-05-19 14:52:26 +02:00

1228 lines
63 KiB
Python

import datetime
import etcd
import os
import sys
from mock import Mock, MagicMock, PropertyMock, patch, mock_open
from patroni.config import Config
from patroni.dcs import Cluster, ClusterConfig, Failover, Leader, Member, get_dcs, SyncState, TimelineHistory
from patroni.dcs.etcd import AbstractEtcdClientWithFailover
from patroni.exceptions import DCSError, PostgresConnectionException, PatroniFatalException
from patroni.ha import Ha, _MemberStatus
from patroni.postgresql import Postgresql
from patroni.postgresql.bootstrap import Bootstrap
from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.config import ConfigHandler
from patroni.postgresql.rewind import Rewind
from patroni.postgresql.slots import SlotsHandler
from patroni.utils import tzutc
from patroni.watchdog import Watchdog
from six.moves import builtins
from . import PostgresInit, MockPostmaster, psycopg_connect, requests_get
from .test_etcd import socket_getaddrinfo, etcd_read, etcd_write
SYSID = '12345678901'
def true(*args, **kwargs):
return True
def false(*args, **kwargs):
return False
def get_cluster(initialize, leader, members, failover, sync, cluster_config=None):
t = datetime.datetime.now().isoformat()
history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '","foo"]]',
[(1, 67197376, 'no recovery target specified', t, 'foo')])
cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True}, 1)
return Cluster(initialize, cluster_config, leader, 10, members, failover, sync, history, None)
def get_cluster_not_initialized_without_leader(cluster_config=None):
return get_cluster(None, None, [], None, SyncState(None, None, None), cluster_config)
def get_cluster_initialized_without_leader(leader=False, failover=None, sync=None, cluster_config=None):
m1 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres',
'api_url': 'http://127.0.0.1:8008/patroni', 'xlog_location': 4})
leader = Leader(0, 0, m1 if leader else Member(0, '', 28, {}))
m2 = Member(0, 'other', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
'api_url': 'http://127.0.0.1:8011/patroni',
'state': 'running',
'pause': True,
'tags': {'clonefrom': True},
'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00",
'postgres_version': '99.0.0'}})
syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1])
return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config)
def get_cluster_initialized_with_leader(failover=None, sync=None):
return get_cluster_initialized_without_leader(leader=True, failover=failover, sync=sync)
def get_cluster_initialized_with_only_leader(failover=None, cluster_config=None):
leader = get_cluster_initialized_without_leader(leader=True, failover=failover).leader
return get_cluster(True, leader, [leader.member], failover, None, cluster_config)
def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None):
return get_cluster_initialized_with_only_leader(
cluster_config=ClusterConfig(1, {
"standby_cluster": {
"host": "localhost",
"port": 5432,
"primary_slot_name": "",
}}, 1)
)
def get_node_status(reachable=True, in_recovery=True, dcs_last_seen=0,
timeline=2, wal_position=10, nofailover=False,
watchdog_failed=False):
def fetch_node_status(e):
tags = {}
if nofailover:
tags['nofailover'] = True
return _MemberStatus(e, reachable, in_recovery, dcs_last_seen, timeline, wal_position, tags, watchdog_failed)
return fetch_node_status
future_restart_time = datetime.datetime.now(tzutc) + datetime.timedelta(days=5)
postmaster_start_time = datetime.datetime.now(tzutc)
class MockPatroni(object):
def __init__(self, p, d):
os.environ[Config.PATRONI_CONFIG_VARIABLE] = """
restapi:
listen: 0.0.0.0:8008
bootstrap:
users:
replicator:
password: rep-pass
options:
- replication
postgresql:
name: foo
data_dir: data/postgresql0
pg_rewind:
username: postgres
password: postgres
watchdog:
mode: off
zookeeper:
exhibitor:
hosts: [localhost]
port: 8181
"""
# We rely on sys.argv in Config, so it's necessary to reset
# all the extra values that are coming from py.test
sys.argv = sys.argv[:1]
self.config = Config(None)
self.config.set_dynamic_configuration({'maximum_lag_on_failover': 5})
self.version = '1.5.7'
self.postgresql = p
self.dcs = d
self.api = Mock()
self.tags = {'foo': 'bar'}
self.nofailover = None
self.replicatefrom = None
self.api.connection_string = 'http://127.0.0.1:8008'
self.clonefrom = None
self.nosync = False
self.scheduled_restart = {'schedule': future_restart_time,
'postmaster_start_time': str(postmaster_start_time)}
self.watchdog = Watchdog(self.config)
self.request = lambda member, **kwargs: requests_get(member.api_url, **kwargs)
def run_async(self, func, args=()):
self.reset_scheduled_action()
if args:
func(*args)
else:
func()
@patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster()))
@patch.object(Postgresql, 'is_leader', Mock(return_value=True))
@patch.object(Postgresql, 'timeline_wal_position', Mock(return_value=(1, 10, 1)))
@patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value=10))
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))
@patch.object(Postgresql, 'controldata', Mock(return_value={
'Database system identifier': SYSID,
'Database cluster state': 'shut down',
'Latest checkpoint location': '0/12345678',
"Latest checkpoint's TimeLineID": '2'}))
@patch.object(SlotsHandler, 'load_replication_slots', Mock(side_effect=Exception))
@patch.object(ConfigHandler, 'append_pg_hba', Mock())
@patch.object(ConfigHandler, 'write_pgpass', Mock(return_value={}))
@patch.object(ConfigHandler, 'write_recovery_conf', Mock())
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
@patch.object(Postgresql, 'query', Mock())
@patch.object(Postgresql, 'checkpoint', Mock())
@patch.object(CancellableSubprocess, 'call', Mock(return_value=0))
@patch.object(Postgresql, 'get_replica_timeline', Mock(return_value=2))
@patch.object(Postgresql, 'get_master_timeline', Mock(return_value=2))
@patch.object(ConfigHandler, 'restore_configuration_files', Mock())
@patch.object(etcd.Client, 'write', etcd_write)
@patch.object(etcd.Client, 'read', etcd_read)
@patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException))
@patch('patroni.postgresql.polling_loop', Mock(return_value=range(1)))
@patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=False))
@patch('patroni.async_executor.AsyncExecutor.run_async', run_async)
@patch('patroni.postgresql.rewind.Thread', Mock())
@patch('subprocess.call', Mock(return_value=0))
@patch('time.sleep', Mock())
class TestHa(PostgresInit):
@patch('socket.getaddrinfo', socket_getaddrinfo)
@patch('patroni.dcs.dcs_modules', Mock(return_value=['patroni.dcs.etcd']))
@patch.object(etcd.Client, 'read', etcd_read)
def setUp(self):
super(TestHa, self).setUp()
with patch.object(AbstractEtcdClientWithFailover, 'machines') as mock_machines:
mock_machines.__get__ = Mock(return_value=['http://remotehost:2379'])
self.p.set_state('running')
self.p.set_role('replica')
self.p.postmaster_start_time = MagicMock(return_value=str(postmaster_start_time))
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=False)
self.e = get_dcs({'etcd': {'ttl': 30, 'host': 'ok:2379', 'scope': 'test',
'name': 'foo', 'retry_timeout': 10}})
self.ha = Ha(MockPatroni(self.p, self.e))
self.ha.old_cluster = self.e.get_cluster()
self.ha.cluster = get_cluster_initialized_without_leader()
self.ha.load_cluster_from_dcs = Mock()
def test_update_lock(self):
self.p.last_operation = Mock(side_effect=PostgresConnectionException(''))
self.ha.dcs.update_leader = Mock(side_effect=Exception)
self.assertFalse(self.ha.update_lock(True))
@patch.object(Postgresql, 'received_timeline', Mock(return_value=None))
def test_touch_member(self):
self.p.timeline_wal_position = Mock(return_value=(0, 1, 0))
self.p.replica_cached_timeline = Mock(side_effect=Exception)
self.ha.touch_member()
self.p.timeline_wal_position = Mock(return_value=(0, 1, 1))
self.p.set_role('standby_leader')
self.ha.touch_member()
def test_is_leader(self):
self.assertFalse(self.ha.is_leader())
def test_start_as_replica(self):
self.p.is_healthy = false
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
@patch('patroni.dcs.etcd.Etcd.initialize', return_value=True)
def test_bootstrap_as_standby_leader(self, initialize):
self.p.data_directory_empty = true
self.ha.cluster = get_cluster_not_initialized_without_leader(cluster_config=ClusterConfig(0, {}, 0))
self.ha.cluster.is_unlocked = true
self.ha.patroni.config._dynamic_configuration = {"standby_cluster": {"port": 5432}}
self.assertEqual(self.ha.run_cycle(), 'trying to bootstrap a new standby leader')
def test_bootstrap_waiting_for_standby_leader(self):
self.p.data_directory_empty = true
self.ha.cluster = get_cluster_initialized_without_leader()
self.ha.cluster.config.data.update({'standby_cluster': {'port': 5432}})
self.assertEqual(self.ha.run_cycle(), 'waiting for standby_leader to bootstrap')
@patch.object(Cluster, 'get_clone_member',
Mock(return_value=Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni',
'conn_url': 'postgres://127.0.0.1:5432/postgres'})))
@patch.object(Bootstrap, 'create_replica', Mock(return_value=0))
def test_start_as_cascade_replica_in_standby_cluster(self):
self.p.data_directory_empty = true
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.ha.cluster.is_unlocked = false
self.assertEqual(self.ha.run_cycle(), "trying to bootstrap from replica 'test'")
def test_recover_replica_failed(self):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID}
self.p.is_running = false
self.p.follow = false
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.assertEqual(self.ha.run_cycle(), 'failed to start postgres')
def test_recover_raft(self):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID}
self.p.is_running = false
self.p.follow = true
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.p.is_running = true
self.ha.dcs.__class__.__name__ = 'Raft'
self.assertEqual(self.ha.run_cycle(), 'started as a secondary')
def test_recover_former_master(self):
self.p.follow = false
self.p.is_running = false
self.p.name = 'leader'
self.p.set_role('master')
self.p.controldata = lambda: {'Database cluster state': 'shut down', 'Database system identifier': SYSID}
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'starting as readonly because i had the session lock')
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
def test_crash_recovery(self):
self.ha.has_lock = true
self.p.is_running = false
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)),\
patch.object(Ha, 'check_timeline', Mock(return_value=False)):
self.ha._async_executor.schedule('doing crash recovery in a single user mode')
self.ha.state_handler.cancellable._process = Mock()
self.ha._crash_recovery_started -= 600
self.ha.patroni.config.set_dynamic_configuration({'maximum_lag_on_failover': 10})
self.assertEqual(self.ha.run_cycle(), 'terminated crash recovery because of startup timeout')
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_crash_recovery_before_rewind(self):
self.p.is_leader = false
self.p.is_running = false
self.p.controldata = lambda: {'Database cluster state': 'in archive recovery',
'Database system identifier': SYSID}
self.ha._rewind.trigger_check_diverged_lsn()
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_recover_with_rewind(self):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.cluster.leader.member.data.update(version='2.0.2', role='master')
self.ha._rewind.pg_rewind = true
self.ha._rewind.check_leader_is_not_in_recovery = true
with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)):
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=False)):
self.p.follow = true
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
self.p.is_running = true
self.ha.follow = Mock(return_value='fake')
self.assertEqual(self.ha.run_cycle(), 'fake')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'should_remove_data_directory_on_diverged_timelines', PropertyMock(return_value=True))
@patch.object(Bootstrap, 'create_replica', Mock(return_value=1))
def test_recover_with_reinitialize(self):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'reinitializing due to diverged timelines')
@patch('sys.exit', return_value=1)
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
def test_sysid_no_match(self, exit_mock):
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'}
self.ha.run_cycle()
exit_mock.assert_called_once_with(1)
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_start_as_readonly(self):
self.p.is_leader = false
self.p.is_healthy = true
self.ha.has_lock = true
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
@patch('patroni.psycopg.connect', psycopg_connect)
def test_acquire_lock_as_master(self):
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
def test_promoted_by_acquiring_lock(self):
self.ha.is_healthiest_node = true
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
def test_promotion_cancelled_after_pre_promote_failed(self):
self.p.is_leader = false
self.p._pre_promote = false
self.ha._is_healthiest_node = true
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(self.ha.run_cycle(), 'Promotion cancelled because the pre-promote script failed')
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
def test_lost_leader_lock_during_promote(self):
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.ha._async_executor.schedule('promote')
self.assertEqual(self.ha.run_cycle(), 'lost leader before promote')
def test_long_promote(self):
self.ha.cluster.is_unlocked = false
self.ha.has_lock = true
self.p.is_leader = false
self.p.set_role('master')
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_demote_after_failing_to_obtain_lock(self):
self.ha.acquire_lock = false
self.assertEqual(self.ha.run_cycle(), 'demoted self after trying and failing to obtain lock')
def test_follow_new_leader_after_failing_to_obtain_lock(self):
self.ha.is_healthiest_node = true
self.ha.acquire_lock = false
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'following new leader after trying and failing to obtain lock')
def test_demote_because_not_healthiest(self):
self.ha.is_healthiest_node = false
self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node')
def test_follow_new_leader_because_not_healthiest(self):
self.ha.is_healthiest_node = false
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
def test_promote_because_have_lock(self):
self.ha.cluster.is_unlocked = false
self.ha.has_lock = true
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
def test_promote_without_watchdog(self):
self.ha.cluster.is_unlocked = false
self.ha.has_lock = true
self.p.is_leader = true
with patch.object(Watchdog, 'activate', Mock(return_value=False)):
self.assertEqual(self.ha.run_cycle(), 'Demoting self because watchdog could not be activated')
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'Not promoting self because watchdog could not be activated')
def test_leader_with_lock(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.cluster.is_unlocked = false
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_demote_because_not_having_lock(self):
self.ha.cluster.is_unlocked = false
with patch.object(Watchdog, 'is_running', PropertyMock(return_value=True)):
self.assertEqual(self.ha.run_cycle(), 'demoting self because I do not have the lock and I was a leader')
def test_demote_because_update_lock_failed(self):
self.ha.cluster.is_unlocked = false
self.ha.has_lock = true
self.ha.update_lock = false
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
with patch.object(Ha, '_get_node_to_follow', Mock(side_effect=DCSError('foo'))):
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS')
@patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000))
def test_follow(self):
self.ha.cluster.is_unlocked = false
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
self.ha.patroni.replicatefrom = "foo"
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
self.ha.cluster.config.data.update({'slots': {'l': {'database': 'a', 'plugin': 'b'}}})
self.ha.cluster.members[1].data['tags']['replicatefrom'] = 'postgresql0'
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
del self.ha.cluster.config.data['slots']
self.ha.cluster.config.data.update({'postgresql': {'use_slots': False}})
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
del self.ha.cluster.config.data['postgresql']['use_slots']
def test_follow_in_pause(self):
self.ha.cluster.is_unlocked = false
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as master without lock')
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_follow_triggers_rewind(self):
self.p.is_leader = false
self.ha._rewind.trigger_check_diverged_lsn()
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
def test_no_etcd_connection_master_demote(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.assertEqual(self.ha.run_cycle(), 'demoted self because DCS is not accessible and i was a leader')
@patch('time.sleep', Mock())
def test_bootstrap_from_another_member(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap from replica \'other\'')
def test_bootstrap_waiting_for_leader(self):
self.ha.cluster = get_cluster_initialized_without_leader()
self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap')
def test_bootstrap_without_leader(self):
self.ha.cluster = get_cluster_initialized_without_leader()
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True)
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap (without leader)')
def test_bootstrap_initialize_lock_failed(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.assertEqual(self.ha.bootstrap(), 'failed to acquire initialize lock')
@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_initialized_new_cluster(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap a new cluster')
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(), 'waiting for end of recovery after bootstrap')
self.p.is_leader = true
self.assertEqual(self.ha.run_cycle(), 'running post_bootstrap')
self.assertEqual(self.ha.run_cycle(), 'initialized a new cluster')
def test_bootstrap_release_initialize_key_on_failure(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.ha.bootstrap()
self.p.is_running = false
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_release_initialize_key_on_watchdog_failure(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
self.ha.bootstrap()
self.p.is_running.return_value = MockPostmaster()
self.p.is_leader = true
with patch.object(Watchdog, 'activate', Mock(return_value=False)):
self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap')
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
@patch('patroni.psycopg.connect', psycopg_connect)
def test_reinitialize(self):
self.assertIsNotNone(self.ha.reinitialize())
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertIsNone(self.ha.reinitialize(True))
self.ha._async_executor.schedule('reinitialize')
self.assertIsNotNone(self.ha.reinitialize())
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertIsNotNone(self.ha.reinitialize())
@patch('time.sleep', Mock())
def test_restart(self):
self.assertEqual(self.ha.restart({}), (True, 'restarted successfully'))
self.p.restart = Mock(return_value=None)
self.assertEqual(self.ha.restart({}), (False, 'postgres is still starting'))
self.p.restart = false
self.assertEqual(self.ha.restart({}), (False, 'restart failed'))
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha._async_executor.schedule('reinitialize')
self.assertEqual(self.ha.restart({}), (False, 'reinitialize already in progress'))
with patch.object(self.ha, "restart_matches", return_value=False):
self.assertEqual(self.ha.restart({'foo': 'bar'}), (False, "restart conditions are not satisfied"))
@patch('os.kill', Mock())
def test_restart_in_progress(self):
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.ha._async_executor.schedule('restart')
self.assertTrue(self.ha.restart_scheduled())
self.assertEqual(self.ha.run_cycle(), 'restart in progress')
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'restart in progress')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'updated leader lock during restart')
self.ha.update_lock = false
self.p.set_role('master')
with patch('patroni.async_executor.CriticalTask.cancel', Mock(return_value=False)):
with patch('patroni.postgresql.Postgresql.terminate_starting_postmaster') as mock_terminate:
self.assertEqual(self.ha.run_cycle(), 'lost leader lock during restart')
mock_terminate.assert_called()
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: restart in progress')
def test_manual_failover_from_leader(self):
self.ha.fetch_node_status = get_node_status()
self.ha.has_lock = true
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', '', None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'blabla', None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha._rewind.rewind_or_reinitialize_needed_and_possible = true
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.ha.fetch_node_status = get_node_status(watchdog_failed=True)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.ha.fetch_node_status = get_node_status(timeline=1)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
# manual failover from the previous leader to us won't happen if we hold the nofailover flag
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
# Failover scheduled time must include timezone
scheduled = datetime.datetime.now()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.ha.run_cycle()
scheduled = datetime.datetime.utcnow().replace(tzinfo=tzutc)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
scheduled = scheduled + datetime.timedelta(seconds=30)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
scheduled = scheduled + datetime.timedelta(seconds=-600)
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
scheduled = None
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
def test_manual_failover_from_leader_in_pause(self):
self.ha.has_lock = true
self.ha.is_paused = true
scheduled = datetime.datetime.now()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', self.p.name, scheduled))
self.assertEqual('PAUSE: no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None))
self.assertEqual('PAUSE: no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
def test_manual_failover_from_leader_in_synchronous_mode(self):
self.p.is_leader = true
self.ha.has_lock = true
self.ha.is_synchronous_mode = true
self.ha.is_failover_possible = false
self.ha.process_sync_replication = Mock()
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None), (self.p.name, None))
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None), (self.p.name, 'a'))
self.ha.is_failover_possible = true
self.assertEqual('manual failover: demoting myself', self.ha.run_cycle())
def test_manual_failover_process_no_leader(self):
self.p.is_leader = false
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'leader', None))
self.p.set_role('replica')
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, self.p.name, '', None))
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
self.p.set_role('replica')
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
# set failover flag to True for all members of the cluster
# this should elect the current member, as we are not going to call the API for it.
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
self.ha.fetch_node_status = get_node_status(nofailover=True) # accessible, in_recovery
self.p.set_role('replica')
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
# same as previous, but set the current member to nofailover. In no case it should be elected as a leader
self.ha.patroni.nofailover = True
self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
# in sync mode only the sync node is allowed to take over
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'other', None))
self.ha.patroni.nofailover = False
self.ha.is_synchronous_mode = true
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
def test_manual_failover_process_no_leader_in_pause(self):
self.ha.is_paused = true
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as master without lock')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as master without lock')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'blabla', None))
self.assertEqual('PAUSE: acquired session lock as a leader', self.ha.run_cycle())
self.p.is_leader = false
self.p.set_role('replica')
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: promoted self to leader by acquiring session lock')
def test_is_healthiest_node(self):
self.ha.state_handler.is_leader = false
self.ha.patroni.nofailover = False
self.ha.fetch_node_status = get_node_status()
self.assertTrue(self.ha.is_healthiest_node())
with patch.object(Watchdog, 'is_healthy', PropertyMock(return_value=False)):
self.assertFalse(self.ha.is_healthiest_node())
with patch('patroni.postgresql.Postgresql.is_starting', return_value=True):
self.assertFalse(self.ha.is_healthiest_node())
self.ha.is_paused = true
self.assertFalse(self.ha.is_healthiest_node())
def test__is_healthiest_node(self):
self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name))
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.p.is_leader = false
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status(in_recovery=False) # accessible, not in_recovery
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status(wal_position=11) # accessible, in_recovery, wal position ahead
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
# in synchronous_mode consider itself healthy if the former leader is accessible in read-only and ahead of us
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
with patch('patroni.postgresql.Postgresql.last_operation', return_value=1):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=1):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.patroni.nofailover = True
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.patroni.nofailover = False
def test_fetch_node_status(self):
member = Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni'})
self.ha.fetch_node_status(member)
member = Member(0, 'test', 1, {'api_url': 'http://localhost:8011/patroni'})
self.ha.fetch_node_status(member)
@patch.object(Rewind, 'pg_rewind', true)
@patch.object(Rewind, 'check_leader_is_not_in_recovery', true)
def test_post_recover(self):
self.p.is_running = false
self.ha.has_lock = true
self.p.set_role('master')
self.assertEqual(self.ha.post_recover(), 'removed leader key after trying and failing to start postgres')
self.ha.has_lock = false
self.assertEqual(self.ha.post_recover(), 'failed to start postgres')
leader = Leader(0, 0, Member(0, 'l', 2, {"version": "1.6", "conn_url": "postgres://a", "role": "master"}))
self.ha._rewind.execute(leader)
self.p.is_running = true
self.assertIsNone(self.ha.post_recover())
def test_schedule_future_restart(self):
self.ha.patroni.scheduled_restart = {}
# do the restart 2 times. The first one should succeed, the second one should fail
self.assertTrue(self.ha.schedule_future_restart({'schedule': future_restart_time}))
self.assertFalse(self.ha.schedule_future_restart({'schedule': future_restart_time}))
def test_delete_future_restarts(self):
self.ha.delete_future_restart()
def test_evaluate_scheduled_restart(self):
self.p.postmaster_start_time = Mock(return_value=str(postmaster_start_time))
# restart already in progress
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
self.assertIsNone(self.ha.evaluate_scheduled_restart())
# restart while the postmaster has been already restarted, fails
with patch.object(self.ha,
'future_restart_scheduled',
Mock(return_value={'postmaster_start_time':
str(postmaster_start_time - datetime.timedelta(days=1)),
'schedule': str(future_restart_time)})):
self.assertIsNone(self.ha.evaluate_scheduled_restart())
with patch.object(self.ha,
'future_restart_scheduled',
Mock(return_value={'postmaster_start_time': str(postmaster_start_time),
'schedule': str(future_restart_time)})):
with patch.object(self.ha,
'should_run_scheduled_action', Mock(return_value=True)):
# restart in the future, ok
self.assertIsNotNone(self.ha.evaluate_scheduled_restart())
with patch.object(self.ha, 'restart', Mock(return_value=(False, "Test"))):
# restart in the future, bit the actual restart failed
self.assertIsNone(self.ha.evaluate_scheduled_restart())
def test_scheduled_restart(self):
self.ha.cluster = get_cluster_initialized_with_leader()
with patch.object(self.ha, "evaluate_scheduled_restart", Mock(return_value="restart scheduled")):
self.assertEqual(self.ha.run_cycle(), "restart scheduled")
def test_restart_matches(self):
self.p._role = 'replica'
self.p._connection.server_version = 90500
self.p._pending_restart = True
self.assertFalse(self.ha.restart_matches("master", "9.5.0", True))
self.assertFalse(self.ha.restart_matches("replica", "9.4.3", True))
self.p._pending_restart = False
self.assertFalse(self.ha.restart_matches("replica", "9.5.2", True))
self.assertTrue(self.ha.restart_matches("replica", "9.5.2", False))
def test_process_healthy_cluster_in_pause(self):
self.p.is_leader = false
self.ha.is_paused = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running as master')
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: waiting to become master after promote...')
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch.object(builtins, 'open', mock_open(read_data='1\t0/40159C0\tno recovery target specified\n'))
def test_process_healthy_standby_cluster_as_standby_leader(self):
self.p.is_leader = false
self.p.name = 'leader'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.p.config.check_recovery_conf = Mock(return_value=(False, False))
self.ha._leader_timeline = 1
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
self.assertEqual(self.ha.run_cycle(), 'no action. I am (leader), the standby leader with the lock')
self.p.set_role('replica')
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
def test_process_healthy_standby_cluster_as_cascade_replica(self):
self.p.is_leader = false
self.p.name = 'replica'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(),
'no action. I am (replica), a secondary, and following a standby leader (leader)')
with patch.object(Leader, 'conn_url', PropertyMock(return_value='')):
self.assertEqual(self.ha.run_cycle(), 'continue following the old known standby leader')
def test_process_unhealthy_standby_cluster_as_standby_leader(self):
self.p.is_leader = false
self.p.name = 'leader'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.ha.cluster.is_unlocked = true
self.ha.sysid_valid = true
self.p._sysid = True
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader by acquiring session lock')
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test_process_unhealthy_standby_cluster_as_cascade_replica(self):
self.p.is_leader = false
self.p.name = 'replica'
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.ha.is_unlocked = true
self.assertTrue(self.ha.run_cycle().startswith('running pg_rewind from remote_master:'))
def test_recover_unhealthy_leader_in_standby_cluster(self):
self.p.is_leader = false
self.p.name = 'leader'
self.p.is_running = false
self.p.follow = false
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.assertEqual(self.ha.run_cycle(), 'starting as a standby leader because i had the session lock')
def test_recover_unhealthy_unlocked_standby_cluster(self):
self.p.is_leader = false
self.p.name = 'leader'
self.p.is_running = false
self.p.follow = false
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
self.ha.cluster.is_unlocked = true
self.ha.has_lock = false
self.assertEqual(self.ha.run_cycle(), 'trying to follow a remote master because standby cluster is unhealthy')
def test_failed_to_update_lock_in_pause(self):
self.ha.update_lock = false
self.ha.is_paused = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEqual(self.ha.run_cycle(),
'PAUSE: continue to run as master after failing to update leader lock in DCS')
def test_postgres_unhealthy_in_pause(self):
self.ha.is_paused = true
self.p.is_healthy = false
self.assertEqual(self.ha.run_cycle(), 'PAUSE: postgres is not running')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running')
def test_no_etcd_connection_in_pause(self):
self.ha.is_paused = true
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.assertEqual(self.ha.run_cycle(), 'PAUSE: DCS is not accessible')
@patch('patroni.ha.Ha.update_lock', return_value=True)
@patch('patroni.ha.Ha.demote')
def test_starting_timeout(self, demote, update_lock):
def check_calls(seq):
for mock, called in seq:
if called:
mock.assert_called_once()
else:
mock.assert_not_called()
mock.reset_mock()
self.ha.has_lock = true
self.ha.cluster = get_cluster_initialized_with_leader()
self.p.check_for_startup = true
self.p.time_in_state = lambda: 30
self.assertEqual(self.ha.run_cycle(), 'PostgreSQL is still starting up, 270 seconds until timeout')
check_calls([(update_lock, True), (demote, False)])
self.p.time_in_state = lambda: 350
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
self.assertEqual(self.ha.run_cycle(),
'master start has timed out, but continuing to wait because failover is not possible')
check_calls([(update_lock, True), (demote, False)])
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL because of startup timeout')
check_calls([(update_lock, True), (demote, True)])
update_lock.return_value = False
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL while starting up because leader key was lost')
check_calls([(update_lock, True), (demote, True)])
self.ha.has_lock = false
self.p.is_leader = false
self.assertEqual(self.ha.run_cycle(),
'no action. I am (postgresql0), a secondary, and following a leader (leader)')
check_calls([(update_lock, False), (demote, False)])
def test_manual_failover_while_starting(self):
self.ha.has_lock = true
self.p.check_for_startup = true
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
@patch('patroni.ha.Ha.demote')
def test_failover_immediately_on_zero_master_start_timeout(self, demote):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader(sync=(self.p.name, 'other'))
self.ha.cluster.config.data['synchronous_mode'] = True
self.ha.patroni.config.set_dynamic_configuration({'master_start_timeout': 0})
self.ha.has_lock = true
self.ha.update_lock = true
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL to fail over after a crash')
demote.assert_called_once()
def test_master_stop_timeout(self):
self.assertEqual(self.ha.master_stop_timeout(), None)
self.ha.patroni.config.set_dynamic_configuration({'master_stop_timeout': 30})
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.assertEqual(self.ha.master_stop_timeout(), 30)
self.ha.patroni.config.set_dynamic_configuration({'master_stop_timeout': 30})
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=False)):
self.assertEqual(self.ha.master_stop_timeout(), None)
self.ha.patroni.config.set_dynamic_configuration({'master_stop_timeout': None})
self.assertEqual(self.ha.master_stop_timeout(), None)
@patch('patroni.postgresql.Postgresql.follow')
def test_demote_immediate(self, follow):
self.ha.has_lock = true
self.e.get_cluster = Mock(return_value=get_cluster_initialized_without_leader())
self.ha.demote('immediate')
follow.assert_called_once_with(None)
def test_process_sync_replication(self):
self.ha.has_lock = true
mock_set_sync = self.p.config.set_synchronous_standby = Mock()
self.p.name = 'leader'
# Test sync key removed when sync mode disabled
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
self.ha.run_cycle()
mock_delete_sync.assert_called_once()
mock_set_sync.assert_called_once_with([])
mock_set_sync.reset_mock()
# Test sync key not touched when not there
self.ha.cluster = get_cluster_initialized_with_leader()
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
self.ha.run_cycle()
mock_delete_sync.assert_not_called()
mock_set_sync.assert_called_once_with([])
mock_set_sync.reset_mock()
self.ha.is_synchronous_mode = true
# Test sync standby not touched when picking the same node
self.p.pick_synchronous_standby = Mock(return_value=(['other'], ['other']))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_set_sync.reset_mock()
# Test sync standby is replaced when switching standbys
self.p.pick_synchronous_standby = Mock(return_value=(['other2'], []))
self.ha.dcs.write_sync_state = Mock(return_value=True)
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(['other2'])
# Test sync standby is replaced when new standby is joined
self.p.pick_synchronous_standby = Mock(return_value=(['other2', 'other3'], ['other2']))
self.ha.dcs.write_sync_state = Mock(return_value=True)
self.ha.run_cycle()
self.assertEqual(mock_set_sync.call_args_list[0][0], (['other2'],))
self.assertEqual(mock_set_sync.call_args_list[1][0], (['other2', 'other3'],))
mock_set_sync.reset_mock()
# Test sync standby is not disabled when updating dcs fails
self.ha.dcs.write_sync_state = Mock(return_value=False)
self.ha.run_cycle()
mock_set_sync.assert_not_called()
mock_set_sync.reset_mock()
# Test changing sync standby
self.ha.dcs.write_sync_state = Mock(return_value=True)
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other')))
# self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.p.pick_synchronous_standby = Mock(return_value=(['other2'], ['other2']))
self.ha.run_cycle()
self.ha.dcs.get_cluster.assert_called_once()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test updating sync standby key failed due to race
self.ha.dcs.write_sync_state = Mock(side_effect=[True, False])
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test updating sync standby key failed due to DCS being not accessible
self.ha.dcs.write_sync_state = Mock(return_value=True)
self.ha.dcs.get_cluster = Mock(side_effect=DCSError('foo'))
self.ha.run_cycle()
# Test changing sync standby failed due to race
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('somebodyelse', None)))
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
# Test sync set to '*' when synchronous_mode_strict is enabled
mock_set_sync.reset_mock()
self.ha.is_synchronous_mode_strict = true
self.p.pick_synchronous_standby = Mock(return_value=([], []))
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(['*'])
def test_sync_replication_become_master(self):
self.ha.is_synchronous_mode = true
mock_set_sync = self.p.config.set_synchronous_standby = Mock()
self.p.is_leader = false
self.p.set_role('replica')
self.ha.has_lock = true
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=True)
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_with_leader(sync=('other', None))
# When we just became master nobody is sync
self.assertEqual(self.ha.enforce_master_role('msg', 'promote msg'), 'promote msg')
mock_set_sync.assert_called_once_with([])
mock_write_sync.assert_called_once_with('leader', None, index=0)
mock_set_sync.reset_mock()
# When we just became master nobody is sync
self.p.set_role('replica')
mock_write_sync.return_value = False
self.assertTrue(self.ha.enforce_master_role('msg', 'promote msg') != 'promote msg')
mock_set_sync.assert_not_called()
def test_unhealthy_sync_mode(self):
self.ha.is_synchronous_mode = true
self.p.is_leader = false
self.p.set_role('replica')
self.p.name = 'other'
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other2'))
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=True)
mock_acquire = self.ha.acquire_lock = Mock(return_value=True)
mock_follow = self.p.follow = Mock()
mock_promote = self.p.promote = Mock()
# If we don't match the sync replica we are not allowed to acquire lock
self.ha.run_cycle()
mock_acquire.assert_not_called()
mock_follow.assert_called_once()
self.assertEqual(mock_follow.call_args[0][0], None)
mock_write_sync.assert_not_called()
mock_follow.reset_mock()
# If we do match we will try to promote
self.ha._is_healthiest_node = true
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other'))
self.ha.run_cycle()
mock_acquire.assert_called_once()
mock_follow.assert_not_called()
mock_promote.assert_called_once()
mock_write_sync.assert_called_once_with('other', None, index=0)
def test_disable_sync_when_restarting(self):
self.ha.is_synchronous_mode = true
self.p.name = 'other'
self.p.is_leader = false
self.p.set_role('replica')
mock_restart = self.p.restart = Mock(return_value=True)
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.ha.touch_member = Mock(return_value=True)
self.ha.dcs.get_cluster = Mock(side_effect=[
get_cluster_initialized_with_leader(sync=('leader', syncstandby))
for syncstandby in ['other', None]])
with patch('time.sleep') as mock_sleep:
self.ha.restart({})
mock_restart.assert_called_once()
mock_sleep.assert_called()
# Restart is still called when DCS connection fails
mock_restart.reset_mock()
self.ha.dcs.get_cluster = Mock(side_effect=DCSError("foo"))
self.ha.restart({})
mock_restart.assert_called_once()
# We don't try to fetch the cluster state when touch_member fails
mock_restart.reset_mock()
self.ha.dcs.get_cluster.reset_mock()
self.ha.touch_member = Mock(return_value=False)
self.ha.restart({})
mock_restart.assert_called_once()
self.ha.dcs.get_cluster.assert_not_called()
def test_effective_tags(self):
self.ha._disable_sync = True
self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar', 'nosync': True})
self.ha._disable_sync = False
self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar'})
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch.object(builtins, 'open', Mock(side_effect=Exception))
def test_restore_cluster_config(self):
self.ha.cluster.config.data.clear()
self.ha.has_lock = true
self.ha.cluster.is_unlocked = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
def test_watch(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.watch(0)
def test_wakup(self):
self.ha.wakeup()
def test_shutdown(self):
self.p.is_running = false
self.ha.is_leader = true
def stop(*args, **kwargs):
kwargs['on_shutdown'](123)
self.p.stop = stop
self.ha.shutdown()
self.ha.is_failover_possible = true
self.ha.shutdown()
@patch('time.sleep', Mock())
def test_leader_with_empty_directory(self):
self.ha.cluster = get_cluster_initialized_with_leader()
self.ha.has_lock = true
self.p.data_directory_empty = true
self.assertEqual(self.ha.run_cycle(), 'released leader key voluntarily as data dir empty and currently leader')
self.assertEqual(self.p.role, 'uninitialized')
# as has_lock is mocked out, we need to fake the leader key release
self.ha.has_lock = false
# will not say bootstrap from leader as replica can't self elect
self.assertEqual(self.ha.run_cycle(), "trying to bootstrap from replica 'other'")
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch.object(builtins, 'open', mock_open(read_data=('1\t0/40159C0\tno recovery target specified\n\n'
'2\t1/40159C0\tno recovery target specified\n')))
def test_update_cluster_history(self):
self.ha.has_lock = true
self.ha.cluster.is_unlocked = false
for tl in (1, 3):
self.p.get_master_timeline = Mock(return_value=tl)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch('sys.exit', return_value=1)
def test_abort_join(self, exit_mock):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.p.is_leader = false
self.ha.run_cycle()
exit_mock.assert_called_once_with(1)
def test_after_pause(self):
self.ha.has_lock = true
self.ha.cluster.is_unlocked = false
self.ha.is_paused = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0), the leader with the lock')
self.ha.is_paused = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch('patroni.psycopg.connect', psycopg_connect)
def test_permanent_logical_slots_after_promote(self):
config = ClusterConfig(1, {'slots': {'l': {'database': 'postgres', 'plugin': 'test_decoding'}}}, 1)
self.p.name = 'other'
self.ha.cluster = get_cluster_initialized_without_leader(cluster_config=config)
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
self.ha.cluster = get_cluster_initialized_without_leader(leader=True, cluster_config=config)
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. I am (other), the leader with the lock')
@patch.object(Cluster, 'has_member', true)
def test_run_cycle(self):
self.ha.dcs.touch_member = Mock(side_effect=DCSError('foo'))
self.assertEqual(self.ha.run_cycle(), 'Unexpected exception raised, please report it as a BUG')
self.ha.dcs.touch_member = Mock(side_effect=PatroniFatalException('foo'))
self.assertRaises(PatroniFatalException, self.ha.run_cycle)
def test_empty_directory_in_pause(self):
self.ha.is_paused = true
self.p.data_directory_empty = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: running with empty data directory')
self.assertEqual(self.p.role, 'uninitialized')
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
def test_sysid_no_match_in_pause(self):
self.ha.is_paused = true
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'}
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as master without lock')
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: released leader key voluntarily due to the system ID mismatch')
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('os.path.exists', Mock(return_value=True))
@patch('shutil.rmtree', Mock())
@patch('os.makedirs', Mock())
@patch('os.open', Mock())
@patch('os.fsync', Mock())
@patch('os.close', Mock())
@patch('os.rename', Mock())
@patch('patroni.postgresql.Postgresql.is_starting', Mock(return_value=False))
@patch.object(builtins, 'open', mock_open())
@patch.object(ConfigHandler, 'check_recovery_conf', Mock(return_value=(False, False)))
@patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000))
@patch.object(SlotsHandler, 'sync_replication_slots', Mock(return_value=['ls']))
def test_follow_copy(self):
self.ha.cluster.is_unlocked = false
self.ha.cluster.config.data['slots'] = {'ls': {'database': 'a', 'plugin': 'b'}}
self.p.is_leader = false
self.assertTrue(self.ha.run_cycle().startswith('Copying logical slots'))
def test_is_failover_possible(self):
self.ha.fetch_node_status = Mock(return_value=_MemberStatus(self.ha.cluster.members[0],
True, True, 0, 2, None, {}, False))
self.assertFalse(self.ha.is_failover_possible(self.ha.cluster.members))