mirror of
https://github.com/optim-enterprises-bv/patroni.git
synced 2026-01-08 16:41:30 +00:00
We should ignor the former leader with higher priority when it reports the same LSN as the current node. This bug could be a contributing factor to issues described in #3295 In addition to that mock socket.getaddrinfo() call in test_api.py to avoid hitting DNS servers.
1846 lines
100 KiB
Python
1846 lines
100 KiB
Python
import datetime
|
|
import os
|
|
import sys
|
|
|
|
from unittest.mock import MagicMock, Mock, mock_open, patch, PropertyMock
|
|
|
|
import etcd
|
|
|
|
from patroni import global_config
|
|
from patroni.collections import CaseInsensitiveSet
|
|
from patroni.config import Config
|
|
from patroni.dcs import Cluster, ClusterConfig, Failover, get_dcs, Leader, Member, Status, SyncState, TimelineHistory
|
|
from patroni.dcs.etcd import AbstractEtcdClientWithFailover
|
|
from patroni.exceptions import DCSError, PatroniFatalException, PostgresConnectionException
|
|
from patroni.ha import _MemberStatus, Ha
|
|
from patroni.postgresql import Postgresql
|
|
from patroni.postgresql.bootstrap import Bootstrap
|
|
from patroni.postgresql.callback_executor import CallbackAction
|
|
from patroni.postgresql.cancellable import CancellableSubprocess
|
|
from patroni.postgresql.config import ConfigHandler
|
|
from patroni.postgresql.postmaster import PostmasterProcess
|
|
from patroni.postgresql.rewind import Rewind
|
|
from patroni.postgresql.slots import SlotsHandler
|
|
from patroni.postgresql.sync import _SyncState
|
|
from patroni.utils import tzutc
|
|
from patroni.watchdog import Watchdog
|
|
|
|
from . import MockPostmaster, PostgresInit, psycopg_connect, requests_get
|
|
from .test_etcd import etcd_read, etcd_write, socket_getaddrinfo
|
|
|
|
SYSID = '12345678901'
|
|
|
|
|
|
def true(*args, **kwargs):
|
|
return True
|
|
|
|
|
|
def false(*args, **kwargs):
|
|
return False
|
|
|
|
|
|
def get_cluster(initialize, leader, members, failover, sync, cluster_config=None, failsafe=None):
|
|
t = datetime.datetime.now().isoformat()
|
|
history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '","foo"]]',
|
|
[(1, 67197376, 'no recovery target specified', t, 'foo')])
|
|
cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True, 'member_slots_ttl': 0}, 1)
|
|
return Cluster(initialize, cluster_config, leader, Status(10, None, []), members, failover, sync, history, failsafe)
|
|
|
|
|
|
def get_cluster_not_initialized_without_leader(cluster_config=None):
|
|
return get_cluster(None, None, [], None, SyncState.empty(), cluster_config)
|
|
|
|
|
|
def get_cluster_bootstrapping_without_leader(cluster_config=None):
|
|
return get_cluster("", None, [], None, SyncState.empty(), cluster_config)
|
|
|
|
|
|
def get_cluster_initialized_without_leader(leader=False, failover=None, sync=None, cluster_config=None, failsafe=False):
|
|
m1 = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres',
|
|
'api_url': 'http://127.0.0.1:8008/patroni', 'xlog_location': 4,
|
|
'role': 'primary', 'state': 'running'})
|
|
leader = Leader(0, 0, m1 if leader else Member(0, '', 28, {}))
|
|
m2 = Member(0, 'other', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
|
'api_url': 'http://127.0.0.1:8011/patroni',
|
|
'state': 'running',
|
|
'pause': True,
|
|
'tags': {'clonefrom': True},
|
|
'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00",
|
|
'postgres_version': '99.0.0'}})
|
|
syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1], 0)
|
|
failsafe = {m.name: m.api_url for m in (m1, m2)} if failsafe else None
|
|
return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config, failsafe)
|
|
|
|
|
|
def get_cluster_initialized_with_leader(failover=None, sync=None):
|
|
return get_cluster_initialized_without_leader(leader=True, failover=failover, sync=sync)
|
|
|
|
|
|
def get_cluster_initialized_with_only_leader(failover=None, cluster_config=None):
|
|
leader = get_cluster_initialized_without_leader(leader=True, failover=failover).leader
|
|
return get_cluster(True, leader, [leader.member], failover, SyncState.empty(), cluster_config)
|
|
|
|
|
|
def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None):
|
|
return get_cluster_initialized_with_only_leader(
|
|
cluster_config=ClusterConfig(1, {
|
|
"standby_cluster": {
|
|
"host": "localhost",
|
|
"port": 5432,
|
|
"primary_slot_name": "",
|
|
}}, 1)
|
|
)
|
|
|
|
|
|
def get_cluster_initialized_with_leader_and_failsafe():
|
|
return get_cluster_initialized_without_leader(leader=True, failsafe=True,
|
|
cluster_config=ClusterConfig(1, {'failsafe_mode': True}, 1))
|
|
|
|
|
|
def get_node_status(reachable=True, in_recovery=True, dcs_last_seen=0,
|
|
timeline=2, wal_position=10, nofailover=False,
|
|
watchdog_failed=False, failover_priority=1, sync_priority=1):
|
|
def fetch_node_status(e):
|
|
tags = {}
|
|
if nofailover:
|
|
tags['nofailover'] = True
|
|
tags['failover_priority'] = failover_priority
|
|
tags['sync_priority'] = sync_priority
|
|
return _MemberStatus(e, reachable, in_recovery, wal_position,
|
|
{'tags': tags, 'watchdog_failed': watchdog_failed,
|
|
'dcs_last_seen': dcs_last_seen, 'timeline': timeline})
|
|
return fetch_node_status
|
|
|
|
|
|
future_restart_time = datetime.datetime.now(tzutc) + datetime.timedelta(days=5)
|
|
postmaster_start_time = datetime.datetime.now(tzutc)
|
|
|
|
|
|
class MockPatroni(object):
|
|
|
|
def __init__(self, p, d):
|
|
os.environ[Config.PATRONI_CONFIG_VARIABLE] = """
|
|
restapi:
|
|
listen: 0.0.0.0:8008
|
|
bootstrap:
|
|
postgresql:
|
|
name: foo
|
|
data_dir: data/postgresql0
|
|
pg_rewind:
|
|
username: postgres
|
|
password: postgres
|
|
watchdog:
|
|
mode: off
|
|
zookeeper:
|
|
exhibitor:
|
|
hosts: [localhost]
|
|
port: 8181
|
|
"""
|
|
# We rely on sys.argv in Config, so it's necessary to reset
|
|
# all the extra values that are coming from py.test
|
|
sys.argv = sys.argv[:1]
|
|
|
|
self.config = Config(None)
|
|
self.version = '1.5.7'
|
|
self.postgresql = p
|
|
self.dcs = d
|
|
self.api = Mock()
|
|
self.tags = {'foo': 'bar'}
|
|
self.nofailover = None
|
|
self.replicatefrom = None
|
|
self.api.connection_string = 'http://127.0.0.1:8008'
|
|
self.clonefrom = None
|
|
self.nosync = False
|
|
self.nostream = False
|
|
self.scheduled_restart = {'schedule': future_restart_time,
|
|
'postmaster_start_time': str(postmaster_start_time)}
|
|
self.watchdog = Watchdog(self.config)
|
|
self.request = lambda *args, **kwargs: requests_get(args[0].api_url, *args[1:], **kwargs)
|
|
self.failover_priority = 1
|
|
self.sync_priority = 1
|
|
|
|
|
|
def run_async(self, func, args=()):
|
|
self.reset_scheduled_action()
|
|
if args:
|
|
func(*args)
|
|
else:
|
|
func()
|
|
|
|
|
|
@patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster()))
|
|
@patch.object(Postgresql, 'is_primary', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'timeline_wal_position', Mock(return_value=(1, 10, 1)))
|
|
@patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value=10))
|
|
@patch.object(Postgresql, 'slots', Mock(return_value={'l': 100}))
|
|
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))
|
|
@patch.object(Postgresql, 'controldata', Mock(return_value={
|
|
'Database system identifier': SYSID,
|
|
'Database cluster state': 'shut down',
|
|
'Latest checkpoint location': '0/12345678',
|
|
"Latest checkpoint's TimeLineID": '2'}))
|
|
@patch.object(SlotsHandler, 'load_replication_slots', Mock(side_effect=Exception))
|
|
@patch.object(ConfigHandler, 'append_pg_hba', Mock())
|
|
@patch.object(ConfigHandler, 'write_pgpass', Mock(return_value={}))
|
|
@patch.object(ConfigHandler, 'write_recovery_conf', Mock())
|
|
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
|
|
@patch.object(Postgresql, 'query', Mock())
|
|
@patch.object(Postgresql, 'checkpoint', Mock())
|
|
@patch.object(CancellableSubprocess, 'call', Mock(return_value=0))
|
|
@patch.object(Postgresql, 'get_replica_timeline', Mock(return_value=2))
|
|
@patch.object(Postgresql, 'get_primary_timeline', Mock(return_value=2))
|
|
@patch.object(Postgresql, 'get_major_version', Mock(return_value=140000))
|
|
@patch.object(Postgresql, 'resume_wal_replay', Mock())
|
|
@patch.object(ConfigHandler, 'restore_configuration_files', Mock())
|
|
@patch.object(etcd.Client, 'write', etcd_write)
|
|
@patch.object(etcd.Client, 'read', etcd_read)
|
|
@patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException))
|
|
@patch('patroni.postgresql.polling_loop', Mock(return_value=range(1)))
|
|
@patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=False))
|
|
@patch('patroni.async_executor.AsyncExecutor.run_async', run_async)
|
|
@patch('patroni.postgresql.rewind.Thread', Mock())
|
|
@patch('patroni.postgresql.mpp.citus.CitusHandler.start', Mock())
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('time.sleep', Mock())
|
|
class TestHa(PostgresInit):
|
|
|
|
@patch('socket.getaddrinfo', socket_getaddrinfo)
|
|
@patch('patroni.dcs.dcs_modules', Mock(return_value=['patroni.dcs.etcd']))
|
|
@patch.object(etcd.Client, 'read', etcd_read)
|
|
@patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379']))
|
|
@patch.object(Config, '_load_cache', Mock())
|
|
def setUp(self):
|
|
super(TestHa, self).setUp()
|
|
self.p.set_state('running')
|
|
self.p.set_role('replica')
|
|
self.p.postmaster_start_time = MagicMock(return_value=str(postmaster_start_time))
|
|
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=False)
|
|
self.e = get_dcs({'etcd': {'ttl': 30, 'host': 'ok:2379', 'scope': 'test',
|
|
'name': 'foo', 'retry_timeout': 10},
|
|
'citus': {'database': 'citus', 'group': None}})
|
|
self.ha = Ha(MockPatroni(self.p, self.e))
|
|
self.ha.old_cluster = self.e.get_cluster()
|
|
self.ha.cluster = get_cluster_initialized_without_leader()
|
|
global_config.update(self.ha.cluster)
|
|
self.ha.load_cluster_from_dcs = Mock()
|
|
|
|
def test_update_lock(self):
|
|
self.ha.is_failsafe_mode = true
|
|
self.p.last_operation = Mock(side_effect=PostgresConnectionException(''))
|
|
self.ha.dcs.update_leader = Mock(side_effect=[DCSError(''), Exception])
|
|
self.assertRaises(DCSError, self.ha.update_lock)
|
|
self.assertFalse(self.ha.update_lock(True))
|
|
|
|
@patch.object(Postgresql, 'received_timeline', Mock(return_value=None))
|
|
def test_touch_member(self):
|
|
self.p._major_version = 110000
|
|
self.p.is_primary = false
|
|
self.p.timeline_wal_position = Mock(return_value=(0, 1, 0))
|
|
self.p.replica_cached_timeline = Mock(side_effect=Exception)
|
|
with patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value='streaming')):
|
|
self.ha.touch_member()
|
|
self.p.timeline_wal_position = Mock(return_value=(0, 1, 1))
|
|
self.p.set_role('standby_leader')
|
|
self.ha.touch_member()
|
|
self.p.set_role('primary')
|
|
self.ha.dcs.touch_member = true
|
|
self.ha.touch_member()
|
|
|
|
def test_is_leader(self):
|
|
self.assertFalse(self.ha.is_leader())
|
|
|
|
def test_start_as_replica(self):
|
|
self.p.is_healthy = false
|
|
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
|
|
|
|
@patch('patroni.dcs.etcd.Etcd.initialize', return_value=True)
|
|
def test_bootstrap_as_standby_leader(self, initialize):
|
|
self.p.data_directory_empty = true
|
|
self.ha.cluster = get_cluster_not_initialized_without_leader(
|
|
cluster_config=ClusterConfig(1, {"standby_cluster": {"port": 5432}}, 1))
|
|
self.assertEqual(self.ha.run_cycle(), 'trying to bootstrap a new standby leader')
|
|
|
|
def test_bootstrap_waiting_for_standby_leader(self):
|
|
self.p.data_directory_empty = true
|
|
self.ha.cluster = get_cluster_initialized_without_leader()
|
|
self.ha.cluster.config.data.update({'standby_cluster': {'port': 5432}})
|
|
self.assertEqual(self.ha.run_cycle(), 'waiting for standby_leader to bootstrap')
|
|
|
|
@patch.object(Cluster, 'get_clone_member',
|
|
Mock(return_value=Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni',
|
|
'conn_url': 'postgres://127.0.0.1:5432/postgres'})))
|
|
@patch.object(Bootstrap, 'create_replica', Mock(return_value=0))
|
|
def test_start_as_cascade_replica_in_standby_cluster(self):
|
|
self.p.data_directory_empty = true
|
|
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
|
self.assertEqual(self.ha.run_cycle(), "trying to bootstrap from replica 'test'")
|
|
|
|
def test_recover_replica_failed(self):
|
|
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID}
|
|
self.p.is_running = false
|
|
self.p.follow = false
|
|
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
|
|
self.assertEqual(self.ha.run_cycle(), 'failed to start postgres')
|
|
|
|
def test_recover_raft(self):
|
|
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': SYSID}
|
|
self.p.is_running = false
|
|
self.p.follow = true
|
|
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
|
|
self.p.is_running = true
|
|
ha_dcs_orig_name = self.ha.dcs.__class__.__name__
|
|
self.ha.dcs.__class__.__name__ = 'Raft'
|
|
self.assertEqual(self.ha.run_cycle(), 'started as a secondary')
|
|
self.ha.dcs.__class__.__name__ = ha_dcs_orig_name
|
|
|
|
def test_recover_former_primary(self):
|
|
self.p.follow = false
|
|
self.p.is_running = false
|
|
self.p.name = 'leader'
|
|
self.p.set_role('demoted')
|
|
self.p.controldata = lambda: {'Database cluster state': 'shut down', 'Database system identifier': SYSID}
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.run_cycle(), 'starting as readonly because i had the session lock')
|
|
|
|
def test_start_primary_after_failure(self):
|
|
self.p.start = false
|
|
self.p.is_running = false
|
|
self.p.name = 'leader'
|
|
self.p.set_role('primary')
|
|
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.run_cycle(), 'starting primary after failure')
|
|
|
|
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
|
|
def test_crash_recovery(self):
|
|
self.ha.has_lock = true
|
|
self.p.is_running = false
|
|
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
|
|
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
|
|
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)), \
|
|
patch.object(Ha, 'check_timeline', Mock(return_value=False)):
|
|
self.ha._async_executor.schedule('doing crash recovery in a single user mode')
|
|
self.ha.state_handler.cancellable._process = Mock()
|
|
self.ha._crash_recovery_started -= 600
|
|
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 10})
|
|
self.assertEqual(self.ha.run_cycle(), 'terminated crash recovery because of startup timeout')
|
|
|
|
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
|
|
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
|
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
|
def test_crash_recovery_before_rewind(self):
|
|
self.p.is_primary = false
|
|
self.p.is_running = false
|
|
self.p.controldata = lambda: {'Database cluster state': 'in archive recovery',
|
|
'Database system identifier': SYSID}
|
|
self.ha._rewind.trigger_check_diverged_lsn()
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
|
|
|
|
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
|
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
|
@patch('os.listdir', Mock(return_value=[]))
|
|
@patch('patroni.postgresql.rewind.fsync_dir', Mock())
|
|
def test_recover_with_rewind(self):
|
|
self.p.is_running = false
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.ha.cluster.leader.member.data.update(version='2.0.2', role='primary')
|
|
self.ha._rewind.pg_rewind = true
|
|
self.ha._rewind.check_leader_is_not_in_recovery = true
|
|
with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True)):
|
|
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
|
|
with patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=False)), \
|
|
patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
|
|
self.p.follow = true
|
|
self.assertEqual(self.ha.run_cycle(), 'starting as a secondary')
|
|
self.p.is_running = true
|
|
self.ha.follow = Mock(return_value='fake')
|
|
self.assertEqual(self.ha.run_cycle(), 'fake')
|
|
|
|
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
|
@patch.object(Rewind, 'should_remove_data_directory_on_diverged_timelines', PropertyMock(return_value=True))
|
|
@patch.object(Bootstrap, 'create_replica', Mock(return_value=1))
|
|
def test_recover_with_reinitialize(self):
|
|
self.p.is_running = false
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.run_cycle(), 'reinitializing due to diverged timelines')
|
|
|
|
@patch('sys.exit', return_value=1)
|
|
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
|
|
def test_sysid_no_match(self, exit_mock):
|
|
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'}
|
|
self.ha.run_cycle()
|
|
exit_mock.assert_called_once_with(1)
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_start_as_readonly(self):
|
|
self.p.is_primary = false
|
|
self.p.is_healthy = true
|
|
self.ha.has_lock = true
|
|
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
|
|
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
def test_acquire_lock_as_primary(self):
|
|
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
|
|
|
|
def test_leader_race_stale_primary(self):
|
|
with patch.object(Postgresql, 'get_primary_timeline', Mock(return_value=1)), \
|
|
patch('patroni.ha.logger.warning') as mock_logger:
|
|
self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node')
|
|
self.assertEqual(mock_logger.call_args[0][0], 'My timeline %s is behind last known cluster timeline %s')
|
|
|
|
def test_promoted_by_acquiring_lock(self):
|
|
self.ha.is_healthiest_node = true
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
|
|
def test_promotion_cancelled_after_pre_promote_failed(self):
|
|
self.p.is_primary = false
|
|
self.p._pre_promote = false
|
|
self.ha._is_healthiest_node = true
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
self.assertEqual(self.ha.run_cycle(), 'Promotion cancelled because the pre-promote script failed')
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
|
|
def test_lost_leader_lock_during_promote(self):
|
|
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
|
|
self.ha._async_executor.schedule('promote')
|
|
self.assertEqual(self.ha.run_cycle(), 'lost leader before promote')
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_long_promote(self):
|
|
self.ha.has_lock = true
|
|
self.p.is_primary = false
|
|
self.p.set_role('primary')
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
|
|
def test_demote_after_failing_to_obtain_lock(self):
|
|
self.ha.acquire_lock = false
|
|
self.assertEqual(self.ha.run_cycle(), 'demoted self after trying and failing to obtain lock')
|
|
|
|
def test_follow_new_leader_after_failing_to_obtain_lock(self):
|
|
self.ha.is_healthiest_node = true
|
|
self.ha.acquire_lock = false
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'following new leader after trying and failing to obtain lock')
|
|
|
|
def test_demote_because_not_healthiest(self):
|
|
self.ha.is_healthiest_node = false
|
|
self.assertEqual(self.ha.run_cycle(), 'demoting self because i am not the healthiest node')
|
|
|
|
def test_follow_new_leader_because_not_healthiest(self):
|
|
self.ha.is_healthiest_node = false
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_promote_because_have_lock(self):
|
|
self.ha.has_lock = true
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
|
|
|
|
def test_promote_without_watchdog(self):
|
|
self.ha.has_lock = true
|
|
self.p.is_primary = true
|
|
with patch.object(Watchdog, 'activate', Mock(return_value=False)):
|
|
self.assertEqual(self.ha.run_cycle(), 'Demoting self because watchdog could not be activated')
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'Not promoting self because watchdog could not be activated')
|
|
|
|
def test_leader_with_lock(self):
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.ha.has_lock = true
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
|
|
def test_coordinator_leader_with_lock(self):
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.ha.has_lock = true
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
|
|
@patch.object(Postgresql, '_wait_for_connection_close', Mock())
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_demote_because_not_having_lock(self):
|
|
with patch.object(Watchdog, 'is_running', PropertyMock(return_value=True)):
|
|
self.assertEqual(self.ha.run_cycle(), 'demoting self because I do not have the lock and I was a leader')
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_demote_because_update_lock_failed(self):
|
|
self.ha.has_lock = true
|
|
self.ha.update_lock = false
|
|
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
|
|
with patch.object(Ha, '_get_node_to_follow', Mock(side_effect=DCSError('foo'))):
|
|
self.assertEqual(self.ha.run_cycle(), 'demoted self because failed to update leader lock in DCS')
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS')
|
|
|
|
def test_get_node_to_follow_nostream(self):
|
|
self.ha.patroni.nostream = True
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha._get_node_to_follow(self.ha.cluster), None)
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_follow(self):
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
|
|
self.ha.patroni.replicatefrom = "foo"
|
|
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
|
|
self.ha.cluster.config.data.update({'slots': {'l': {'database': 'a', 'plugin': 'b'}}})
|
|
self.ha.cluster.members[1].data['tags']['replicatefrom'] = 'postgresql0'
|
|
self.ha.patroni.nofailover = True
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
|
|
del self.ha.cluster.config.data['slots']
|
|
self.ha.cluster.config.data.update({'postgresql': {'use_slots': False}})
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), a secondary, and following a leader ()')
|
|
del self.ha.cluster.config.data['postgresql']['use_slots']
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_follow_in_pause(self):
|
|
self.ha.is_paused = true
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0)')
|
|
|
|
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
|
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
|
def test_follow_triggers_rewind(self):
|
|
self.p.is_primary = false
|
|
self.ha._rewind.trigger_check_diverged_lsn()
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
|
|
|
|
def test_no_dcs_connection_primary_demote(self):
|
|
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
|
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
|
|
self.ha._async_executor.schedule('dummy')
|
|
self.assertEqual(self.ha.run_cycle(), 'demoted self because DCS is not accessible and I was a leader')
|
|
|
|
def test_check_failsafe_topology(self):
|
|
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
|
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
|
|
global_config.update(self.ha.cluster)
|
|
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
|
|
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
|
|
self.ha.state_handler.name = self.ha.cluster.leader.name
|
|
self.assertFalse(self.ha.failsafe_is_active())
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
|
|
self.assertTrue(self.ha.failsafe_is_active())
|
|
with patch.object(Postgresql, 'slots', Mock(side_effect=Exception)):
|
|
self.ha.patroni.request = Mock(side_effect=Exception)
|
|
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
|
|
self.assertFalse(self.ha.failsafe_is_active())
|
|
self.ha.dcs._last_failsafe.clear()
|
|
self.ha.dcs._last_failsafe[self.ha.cluster.leader.name] = self.ha.cluster.leader.member.api_url
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
|
|
|
|
def test_no_dcs_connection_primary_failsafe(self):
|
|
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
|
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
|
|
for m in self.ha.cluster.members:
|
|
if m.name != self.ha.cluster.leader.name:
|
|
m.data['tags']['replicatefrom'] = 'test'
|
|
global_config.update(self.ha.cluster)
|
|
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
|
|
self.ha.state_handler.name = self.ha.cluster.leader.name
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
|
|
|
|
def test_readonly_dcs_primary_failsafe(self):
|
|
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
|
|
self.ha.dcs.update_leader = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
|
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
|
|
self.ha.state_handler.name = self.ha.cluster.leader.name
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'continue to run as a leader because failsafe mode is enabled and all members are accessible')
|
|
|
|
def test_no_dcs_connection_replica_failsafe(self):
|
|
self.p.last_operation = Mock(side_effect=PostgresConnectionException(''))
|
|
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
|
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
|
|
global_config.update(self.ha.cluster)
|
|
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
|
|
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
|
|
self.p.is_primary = false
|
|
with patch('patroni.ha.logger.debug') as mock_logger:
|
|
self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible')
|
|
self.assertEqual(mock_logger.call_args_list[0][0][0], 'Failed to fetch current wal lsn: %r')
|
|
|
|
def test_no_dcs_connection_replica_failsafe_not_enabled_but_active(self):
|
|
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
|
|
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'DCS is not accessible')
|
|
|
|
def test_update_failsafe(self):
|
|
self.assertRaises(Exception, self.ha.update_failsafe, {})
|
|
self.p.set_role('primary')
|
|
self.assertEqual(self.ha.update_failsafe({}), 'Running as a leader')
|
|
|
|
def test_call_failsafe_member(self):
|
|
member = Member(0, 'test', 1, {'api_url': 'http://localhost:8011/patroni'})
|
|
self.ha.patroni.request = Mock()
|
|
self.ha.patroni.request.return_value.data = b'Accepted'
|
|
self.ha.patroni.request.return_value.status = 200
|
|
with patch('patroni.ha.logger.info') as mock_logger:
|
|
ret = self.ha.call_failsafe_member({}, member)
|
|
self.assertEqual(mock_logger.call_args_list[0][0],
|
|
('Got response from %s %s: %s', 'test', 'http://localhost:8011/failsafe', 'Accepted'))
|
|
self.assertTrue(ret.accepted)
|
|
|
|
e = Exception('request failed')
|
|
self.ha.patroni.request.side_effect = e
|
|
with patch('patroni.ha.logger.warning') as mock_logger:
|
|
ret = self.ha.call_failsafe_member({}, member)
|
|
self.assertEqual(mock_logger.call_args_list[0][0],
|
|
('Request failed to %s: POST %s (%s)', 'test', 'http://localhost:8011/failsafe', e))
|
|
self.assertFalse(ret.accepted)
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test_bootstrap_from_another_member(self):
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap from replica \'other\'')
|
|
|
|
def test_bootstrap_waiting_for_leader(self):
|
|
self.ha.cluster = get_cluster_initialized_without_leader()
|
|
self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap')
|
|
|
|
def test_bootstrap_without_leader(self):
|
|
self.ha.cluster = get_cluster_initialized_without_leader()
|
|
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True)
|
|
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap (without leader)')
|
|
|
|
def test_bootstrap_not_running_concurrently(self):
|
|
self.ha.cluster = get_cluster_bootstrapping_without_leader()
|
|
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=True)
|
|
self.assertEqual(self.ha.bootstrap(), 'waiting for leader to bootstrap')
|
|
|
|
def test_bootstrap_initialize_lock_failed(self):
|
|
self.ha.cluster = get_cluster_not_initialized_without_leader()
|
|
self.assertEqual(self.ha.bootstrap(), 'failed to acquire initialize lock')
|
|
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
@patch('patroni.postgresql.mpp.citus.connect', psycopg_connect)
|
|
@patch('patroni.postgresql.mpp.citus.quote_ident', Mock())
|
|
@patch.object(Postgresql, 'connection', Mock(return_value=None))
|
|
def test_bootstrap_initialized_new_cluster(self):
|
|
self.ha.cluster = get_cluster_not_initialized_without_leader()
|
|
self.e.initialize = true
|
|
self.assertEqual(self.ha.bootstrap(), 'trying to bootstrap a new cluster')
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(), 'waiting for end of recovery after bootstrap')
|
|
self.p.is_primary = true
|
|
self.ha.is_synchronous_mode = true
|
|
self.assertEqual(self.ha.run_cycle(), 'running post_bootstrap')
|
|
self.assertEqual(self.ha.run_cycle(), 'initialized a new cluster')
|
|
|
|
def test_bootstrap_release_initialize_key_on_failure(self):
|
|
self.ha.cluster = get_cluster_not_initialized_without_leader()
|
|
self.e.initialize = true
|
|
self.ha.bootstrap()
|
|
self.p.is_running = false
|
|
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
|
|
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
@patch('patroni.postgresql.mpp.citus.connect', psycopg_connect)
|
|
@patch('patroni.postgresql.mpp.citus.quote_ident', Mock())
|
|
@patch.object(Postgresql, 'connection', Mock(return_value=None))
|
|
def test_bootstrap_release_initialize_key_on_watchdog_failure(self):
|
|
self.ha.cluster = get_cluster_not_initialized_without_leader()
|
|
self.e.initialize = true
|
|
self.ha.bootstrap()
|
|
self.p.is_primary = true
|
|
with patch.object(Watchdog, 'activate', Mock(return_value=False)), \
|
|
patch('patroni.ha.logger.error') as mock_logger:
|
|
self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap')
|
|
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
|
|
self.assertTrue(mock_logger.call_args[0][0].startswith('Cancelling bootstrap because'
|
|
' watchdog activation failed'))
|
|
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
def test_reinitialize(self):
|
|
self.assertIsNotNone(self.ha.reinitialize())
|
|
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertIsNone(self.ha.reinitialize(True))
|
|
self.ha._async_executor.schedule('reinitialize')
|
|
self.assertIsNotNone(self.ha.reinitialize())
|
|
|
|
self.ha.state_handler.name = self.ha.cluster.leader.name
|
|
self.assertIsNotNone(self.ha.reinitialize())
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test_restart(self):
|
|
self.assertEqual(self.ha.restart({}), (True, 'restarted successfully'))
|
|
self.p.restart = Mock(return_value=None)
|
|
self.assertEqual(self.ha.restart({}), (False, 'postgres is still starting'))
|
|
self.p.restart = false
|
|
self.assertEqual(self.ha.restart({}), (False, 'restart failed'))
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.ha._async_executor.schedule('reinitialize')
|
|
self.assertEqual(self.ha.restart({}), (False, 'reinitialize already in progress'))
|
|
with patch.object(self.ha, "restart_matches", return_value=False):
|
|
self.assertEqual(self.ha.restart({'foo': 'bar'}), (False, "restart conditions are not satisfied"))
|
|
|
|
@patch('time.sleep', Mock())
|
|
@patch.object(ConfigHandler, 'replace_pg_hba', Mock())
|
|
@patch.object(ConfigHandler, 'replace_pg_ident', Mock())
|
|
@patch.object(PostmasterProcess, 'start', Mock(return_value=MockPostmaster()))
|
|
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
|
|
def test_worker_restart(self):
|
|
self.ha.has_lock = true
|
|
self.ha.patroni.request = Mock()
|
|
self.p.is_running = Mock(side_effect=[Mock(), False])
|
|
self.assertEqual(self.ha.restart({}), (True, 'restarted successfully'))
|
|
self.ha.patroni.request.assert_called()
|
|
self.assertEqual(self.ha.patroni.request.call_args_list[0][0][3]['type'], 'before_demote')
|
|
self.assertEqual(self.ha.patroni.request.call_args_list[1][0][3]['type'], 'after_promote')
|
|
|
|
@patch('os.kill', Mock())
|
|
def test_restart_in_progress(self):
|
|
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
|
|
self.ha._async_executor.schedule('restart')
|
|
self.assertTrue(self.ha.restart_scheduled())
|
|
self.assertEqual(self.ha.run_cycle(), 'restart in progress')
|
|
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.run_cycle(), 'restart in progress')
|
|
|
|
self.ha.has_lock = true
|
|
self.assertEqual(self.ha.run_cycle(), 'updated leader lock during restart')
|
|
|
|
self.ha.update_lock = false
|
|
self.p.set_role('primary')
|
|
with patch('patroni.async_executor.CriticalTask.cancel', Mock(return_value=False)), \
|
|
patch('patroni.async_executor.CriticalTask.result',
|
|
PropertyMock(return_value=PostmasterProcess(os.getpid())), create=True), \
|
|
patch('patroni.postgresql.Postgresql.terminate_starting_postmaster') as mock_terminate:
|
|
self.assertEqual(self.ha.run_cycle(), 'lost leader lock during restart')
|
|
mock_terminate.assert_called()
|
|
|
|
self.ha.is_paused = true
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: restart in progress')
|
|
|
|
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
|
|
def test_manual_failover_from_leader(self):
|
|
self.ha.has_lock = true # I am the leader
|
|
|
|
# to me
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
mock_warning.assert_called_with('%s: I am already the leader, no need to %s', 'manual failover', 'failover')
|
|
|
|
# to a non-existent candidate
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'blabla', None))
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
mock_warning.assert_called_with(
|
|
'%s: no healthy members found, %s is not possible', 'manual failover', 'failover')
|
|
|
|
# to an existent candidate
|
|
self.ha.fetch_node_status = get_node_status()
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', 'b', None))
|
|
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
|
|
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
|
|
|
|
# to a candidate on an older timeline
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.fetch_node_status = get_node_status(timeline=1)
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
self.assertEqual(mock_info.call_args_list[0][0],
|
|
('Timeline %s of member %s is behind the cluster timeline %s', 1, 'b', 2))
|
|
|
|
# to a lagging candidate
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.fetch_node_status = get_node_status(wal_position=1)
|
|
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
self.assertEqual(mock_info.call_args_list[0][0],
|
|
('Member %s exceeds maximum replication lag', 'b'))
|
|
self.ha.cluster.members.pop()
|
|
|
|
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
|
|
def test_manual_switchover_from_leader(self):
|
|
self.ha.has_lock = true # I am the leader
|
|
|
|
self.ha.fetch_node_status = get_node_status()
|
|
|
|
# different leader specified in failover key, no candidate
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, 'blabla', '', None))
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
mock_warning.assert_called_with(
|
|
'%s: leader name does not match: %s != %s', 'switchover', 'blabla', 'postgresql0')
|
|
|
|
# no candidate
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None))
|
|
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
|
|
|
|
self.ha._rewind.rewind_or_reinitialize_needed_and_possible = true
|
|
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
|
|
|
|
# other members with failover_limitation_s
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.fetch_node_status = get_node_status(nofailover=True)
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not allowed to promote'))
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.fetch_node_status = get_node_status(watchdog_failed=True)
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not watchdog capable'))
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.fetch_node_status = get_node_status(timeline=1)
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
self.assertEqual(mock_info.call_args_list[0][0],
|
|
('Timeline %s of member %s is behind the cluster timeline %s', 1, 'leader', 2))
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.fetch_node_status = get_node_status(wal_position=1)
|
|
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s exceeds maximum replication lag', 'leader'))
|
|
|
|
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
|
|
def test_scheduled_switchover_from_leader(self):
|
|
self.ha.has_lock = true # I am the leader
|
|
|
|
self.ha.fetch_node_status = get_node_status()
|
|
|
|
# switchover scheduled time must include timezone
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
scheduled = datetime.datetime.now()
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'blabla', scheduled))
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
self.assertIn('Incorrect value of scheduled_at: %s', mock_warning.call_args_list[0][0])
|
|
|
|
# scheduled now
|
|
scheduled = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=tzutc)
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'b', scheduled))
|
|
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
|
|
self.assertEqual('switchover: demoting myself', self.ha.run_cycle())
|
|
|
|
# scheduled in the future
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
scheduled = scheduled + datetime.timedelta(seconds=30)
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'blabla', scheduled))
|
|
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
|
|
self.assertIn('Awaiting %s at %s (in %.0f seconds)', mock_info.call_args_list[0][0])
|
|
|
|
# stale value
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
scheduled = scheduled + datetime.timedelta(seconds=-600)
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'b', scheduled))
|
|
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
|
|
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
|
|
self.assertIn('Found a stale %s value, cleaning up: %s', mock_warning.call_args_list[0][0])
|
|
|
|
def test_manual_switchover_from_leader_in_pause(self):
|
|
self.ha.has_lock = true # I am the leader
|
|
self.ha.is_paused = true
|
|
|
|
# no candidate
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, '', None))
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
self.assertEqual('PAUSE: no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
|
|
mock_warning.assert_called_with(
|
|
'%s is possible only to a specific candidate in a paused state', 'Switchover')
|
|
|
|
def test_manual_failover_from_leader_in_pause(self):
|
|
self.ha.has_lock = true
|
|
self.ha.fetch_node_status = get_node_status()
|
|
self.ha.is_paused = true
|
|
|
|
# failover from me, candidate is healthy
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, None, 'b', None))
|
|
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
|
|
self.assertEqual('PAUSE: manual failover: demoting myself', self.ha.run_cycle())
|
|
self.ha.cluster.members.pop()
|
|
|
|
def test_manual_failover_from_leader_in_synchronous_mode(self):
|
|
self.ha.is_synchronous_mode = true
|
|
self.ha.process_sync_replication = Mock()
|
|
self.ha.fetch_node_status = get_node_status()
|
|
|
|
# I am the leader
|
|
self.p.is_primary = true
|
|
self.ha.has_lock = true
|
|
|
|
# the candidate is not in sync members but we allow failover to an async candidate
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, None, 'b', None), sync=(self.p.name, 'a'))
|
|
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
|
|
self.assertEqual('manual failover: demoting myself', self.ha.run_cycle())
|
|
self.ha.cluster.members.pop()
|
|
|
|
def test_manual_switchover_from_leader_in_synchronous_mode(self):
|
|
self.ha.is_synchronous_mode = true
|
|
self.ha.process_sync_replication = Mock()
|
|
|
|
# I am the leader
|
|
self.p.is_primary = true
|
|
self.ha.has_lock = true
|
|
|
|
# candidate specified is not in sync members
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None),
|
|
sync=(self.p.name, 'blabla'))
|
|
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
|
|
self.assertEqual(mock_warning.call_args_list[0][0],
|
|
('%s candidate=%s does not match with sync_standbys=%s', 'Switchover', 'a', 'blabla'))
|
|
|
|
# the candidate is in sync members and is healthy
|
|
self.ha.fetch_node_status = get_node_status(wal_position=305419896)
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, self.p.name, 'a', None),
|
|
sync=(self.p.name, 'a'))
|
|
self.ha.cluster.members.append(Member(0, 'a', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
|
|
self.assertEqual('switchover: demoting myself', self.ha.run_cycle())
|
|
|
|
# the candidate is in sync members but is not healthy
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.fetch_node_status = get_node_status(nofailover=true)
|
|
self.assertEqual('no action. I am (postgresql0), the leader with the lock', self.ha.run_cycle())
|
|
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'a', 'not allowed to promote'))
|
|
|
|
def test_manual_failover_process_no_leader(self):
|
|
self.p.is_primary = false
|
|
self.p.set_role('replica')
|
|
|
|
# failover to another member, fetch_node_status for candidate fails
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'leader', None))
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
self.assertEqual(mock_warning.call_args_list[1][0],
|
|
('%s: member %s is %s', 'manual failover', 'leader', 'not reachable'))
|
|
|
|
# failover to another member, candidate is accessible, in_recovery
|
|
self.p.set_role('replica')
|
|
self.ha.fetch_node_status = get_node_status()
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
|
|
# set nofailover flag to True for all members of the cluster
|
|
# this should elect the current member, as we are not going to call the API for it.
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
|
|
self.ha.fetch_node_status = get_node_status(nofailover=True)
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
|
|
# failover to me but I am set to nofailover. In no case I should be elected as a leader
|
|
self.p.set_role('replica')
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None))
|
|
self.ha.patroni.nofailover = True
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
|
|
|
|
self.ha.patroni.nofailover = False
|
|
|
|
# failover to another member that is on an older timeline (only failover_limitation() is checked)
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'b', None))
|
|
self.ha.cluster.members.append(Member(0, 'b', 28, {'api_url': 'http://127.0.0.1:8011/patroni'}))
|
|
self.ha.fetch_node_status = get_node_status(timeline=1)
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
mock_info.assert_called_with('%s: to %s, i am %s', 'manual failover', 'b', 'postgresql0')
|
|
|
|
# failover to another member lagging behind the cluster_lsn (only failover_limitation() is checked)
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
|
|
self.ha.fetch_node_status = get_node_status(wal_position=1)
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
mock_info.assert_called_with('%s: to %s, i am %s', 'manual failover', 'b', 'postgresql0')
|
|
|
|
def test_manual_switchover_process_no_leader(self):
|
|
self.p.is_primary = false
|
|
self.p.set_role('replica')
|
|
|
|
# I was the leader, other members are healthy
|
|
self.ha.fetch_node_status = get_node_status()
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, self.p.name, '', None))
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
|
|
# I was the leader, I am the only healthy member
|
|
with patch('patroni.ha.logger.info') as mock_info:
|
|
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s is %s', 'leader', 'not reachable'))
|
|
self.assertEqual(mock_info.call_args_list[1][0], ('Member %s is %s', 'other', 'not reachable'))
|
|
|
|
def test_manual_failover_process_no_leader_in_synchronous_mode(self):
|
|
self.ha.is_synchronous_mode = true
|
|
self.p.is_primary = false
|
|
self.ha.fetch_node_status = get_node_status(nofailover=True) # other nodes are not healthy
|
|
|
|
# manual failover when our name (postgresql0) isn't in the /sync key and the candidate node is not available
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None),
|
|
sync=('leader1', 'blabla'))
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
|
|
# manual failover when the candidate node isn't available but our name is in the /sync key
|
|
# while other sync node is nofailover
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None),
|
|
sync=('leader1', 'postgresql0'))
|
|
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
|
|
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
self.assertEqual(mock_warning.call_args_list[0][0],
|
|
('%s: member %s is %s', 'manual failover', 'other', 'not allowed to promote'))
|
|
|
|
# manual failover to our node (postgresql0),
|
|
# which name is not in sync nodes list (some sync nodes are available)
|
|
self.p.set_role('replica')
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'postgresql0', None),
|
|
sync=('leader1', 'other'))
|
|
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['leader1']),
|
|
CaseInsensitiveSet(['leader1'])))
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
|
|
def test_manual_switchover_process_no_leader_in_synchronous_mode(self):
|
|
self.ha.is_synchronous_mode = true
|
|
self.p.is_primary = false
|
|
|
|
# to a specific node, which name doesn't match our name (postgresql0)
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'other', None))
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
|
|
# to our node (postgresql0), which name is not in sync nodes list
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'postgresql0', None),
|
|
sync=('leader1', 'blabla'))
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
|
|
# without candidate, our name (postgresql0) is not in the sync nodes list
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None),
|
|
sync=('leader', 'blabla'))
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
|
|
|
# switchover from a specific leader, but the only sync node (us, postgresql0) has nofailover tag
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None),
|
|
sync=('postgresql0'))
|
|
self.ha.patroni.nofailover = True
|
|
self.assertEqual(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
|
|
|
|
def test_manual_failover_process_no_leader_in_pause(self):
|
|
self.ha.is_paused = true
|
|
|
|
# I am running as primary, cluster is unlocked, the candidate is allowed to promote
|
|
# but we are in pause
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
|
|
|
|
def test_manual_switchover_process_no_leader_in_pause(self):
|
|
self.ha.is_paused = true
|
|
|
|
# I am running as primary, cluster is unlocked, no candidate specified
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', '', None))
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
|
|
|
|
# the candidate is not running
|
|
with patch('patroni.ha.logger.warning') as mock_warning:
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', 'blabla', None))
|
|
self.assertEqual('PAUSE: acquired session lock as a leader', self.ha.run_cycle())
|
|
self.assertEqual(
|
|
mock_warning.call_args_list[0][0],
|
|
('%s: removing failover key because failover candidate is not running', 'switchover'))
|
|
|
|
# switchover to me, I am not leader
|
|
self.p.is_primary = false
|
|
self.p.set_role('replica')
|
|
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, 'leader', self.p.name, None))
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: promoted self to leader by acquiring session lock')
|
|
|
|
def test_is_healthiest_node(self):
|
|
self.ha.is_failsafe_mode = true
|
|
self.ha.state_handler.is_primary = false
|
|
self.ha.patroni.nofailover = False
|
|
self.ha.fetch_node_status = get_node_status()
|
|
self.ha.dcs._last_failsafe = {'foo': ''}
|
|
self.assertFalse(self.ha.is_healthiest_node())
|
|
self.ha.dcs._last_failsafe = {'postgresql0': ''}
|
|
self.assertTrue(self.ha.is_healthiest_node())
|
|
self.ha.dcs._last_failsafe = None
|
|
with patch.object(Watchdog, 'is_healthy', PropertyMock(return_value=False)):
|
|
self.assertFalse(self.ha.is_healthiest_node())
|
|
self.ha.is_paused = true
|
|
self.assertFalse(self.ha.is_healthiest_node())
|
|
|
|
def test__is_healthiest_node(self):
|
|
self.p.is_primary = false
|
|
self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name))
|
|
global_config.update(self.ha.cluster)
|
|
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
|
|
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
self.ha.fetch_node_status = get_node_status(in_recovery=False) # accessible, not in_recovery
|
|
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
self.ha.fetch_node_status = get_node_status(failover_priority=2) # accessible, in_recovery, higher priority
|
|
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
# if there is a higher-priority node but it has a lower WAL position then this node should race
|
|
self.ha.fetch_node_status = get_node_status(failover_priority=6, wal_position=9)
|
|
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
# if the old leader is a higher-priority node on the same WAL position then this node should race
|
|
self.ha.fetch_node_status = get_node_status(failover_priority=6)
|
|
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members, leader=self.ha.old_cluster.leader))
|
|
self.ha.fetch_node_status = get_node_status(wal_position=11) # accessible, in_recovery, wal position ahead
|
|
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
# in synchronous_mode consider itself healthy if the former leader is accessible in read-only and ahead of us
|
|
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
|
|
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
|
|
global_config.update(self.ha.cluster)
|
|
with patch('patroni.postgresql.Postgresql.last_operation', return_value=1):
|
|
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=None):
|
|
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=1):
|
|
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
self.ha.patroni.nofailover = True
|
|
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
self.ha.patroni.nofailover = None
|
|
self.ha.patroni.failover_priority = 0
|
|
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
|
|
|
def test_fetch_node_status(self):
|
|
member = Member(0, 'test', 1, {'api_url': 'http://127.0.0.1:8011/patroni'})
|
|
self.ha.fetch_node_status(member)
|
|
member = Member(0, 'test', 1, {'api_url': 'http://localhost:8011/patroni'})
|
|
self.ha.patroni.request = Mock()
|
|
self.ha.patroni.request.return_value.data = b'{"wal":{"location":1},"role":"primary"}'
|
|
ret = self.ha.fetch_node_status(member)
|
|
self.assertFalse(ret.in_recovery)
|
|
|
|
@patch.object(Rewind, 'pg_rewind', true)
|
|
@patch.object(Rewind, 'check_leader_is_not_in_recovery', true)
|
|
@patch('os.listdir', Mock(return_value=[]))
|
|
@patch('patroni.postgresql.rewind.fsync_dir', Mock())
|
|
@patch.object(Postgresql, 'call_nowait')
|
|
def test_post_recover(self, mock_call_nowait):
|
|
self.p.is_running = false
|
|
self.ha.has_lock = true
|
|
self.p.set_role('primary')
|
|
self.assertEqual(self.ha.post_recover(), 'removed leader key after trying and failing to start postgres')
|
|
self.assertEqual(self.p.role, 'demoted')
|
|
mock_call_nowait.assert_called_once_with(CallbackAction.ON_ROLE_CHANGE)
|
|
self.ha.has_lock = false
|
|
self.assertEqual(self.ha.post_recover(), 'failed to start postgres')
|
|
leader = Leader(0, 0, Member(0, 'l', 2, {"version": "1.6", "conn_url": "postgres://a", "role": "primary"}))
|
|
self.ha._rewind.execute(leader)
|
|
self.p.is_running = true
|
|
self.assertIsNone(self.ha.post_recover())
|
|
|
|
def test_schedule_future_restart(self):
|
|
self.ha.patroni.scheduled_restart = {}
|
|
# do the restart 2 times. The first one should succeed, the second one should fail
|
|
self.assertTrue(self.ha.schedule_future_restart({'schedule': future_restart_time}))
|
|
self.assertFalse(self.ha.schedule_future_restart({'schedule': future_restart_time}))
|
|
|
|
def test_delete_future_restarts(self):
|
|
self.ha.delete_future_restart()
|
|
|
|
def test_evaluate_scheduled_restart(self):
|
|
self.p.postmaster_start_time = Mock(return_value=str(postmaster_start_time))
|
|
# restart already in progress
|
|
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
|
|
self.assertIsNone(self.ha.evaluate_scheduled_restart())
|
|
# restart while the postmaster has been already restarted, fails
|
|
with patch.object(self.ha,
|
|
'future_restart_scheduled',
|
|
Mock(return_value={'postmaster_start_time':
|
|
str(postmaster_start_time - datetime.timedelta(days=1)),
|
|
'schedule': str(future_restart_time)})):
|
|
self.assertIsNone(self.ha.evaluate_scheduled_restart())
|
|
with patch.object(self.ha,
|
|
'future_restart_scheduled',
|
|
Mock(return_value={'postmaster_start_time': str(postmaster_start_time),
|
|
'schedule': str(future_restart_time)})):
|
|
with patch.object(self.ha,
|
|
'should_run_scheduled_action', Mock(return_value=True)):
|
|
# restart in the future, ok
|
|
self.assertIsNotNone(self.ha.evaluate_scheduled_restart())
|
|
with patch.object(self.ha, 'restart', Mock(return_value=(False, "Test"))):
|
|
# restart in the future, bit the actual restart failed
|
|
self.assertIsNone(self.ha.evaluate_scheduled_restart())
|
|
|
|
def test_scheduled_restart(self):
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
with patch.object(self.ha, "evaluate_scheduled_restart", Mock(return_value="restart scheduled")):
|
|
self.assertEqual(self.ha.run_cycle(), "restart scheduled")
|
|
|
|
def test_restart_matches(self):
|
|
self.p._role = 'replica'
|
|
self.p._connection.server_version = 90500
|
|
self.p._pending_restart = True
|
|
self.assertFalse(self.ha.restart_matches("primary", "9.5.0", True))
|
|
self.assertFalse(self.ha.restart_matches("replica", "9.4.3", True))
|
|
self.p._pending_restart = False
|
|
self.assertFalse(self.ha.restart_matches("replica", "9.5.2", True))
|
|
self.assertTrue(self.ha.restart_matches("replica", "9.5.2", False))
|
|
|
|
def test_process_healthy_cluster_in_pause(self):
|
|
self.p.is_primary = false
|
|
self.ha.is_paused = true
|
|
self.p.name = 'leader'
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running as primary')
|
|
self.ha.cluster = get_cluster_initialized_with_leader(Failover(0, '', self.p.name, None))
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: waiting to become primary after promote...')
|
|
|
|
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
|
|
@patch('builtins.open', mock_open(read_data='1\t0/40159C0\tno recovery target specified\n'))
|
|
def test_process_healthy_standby_cluster_as_standby_leader(self):
|
|
self.p.is_primary = false
|
|
self.p.name = 'leader'
|
|
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
|
self.p.config.check_recovery_conf = Mock(return_value=(False, False))
|
|
self.ha._leader_timeline = 1
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (leader), the standby leader with the lock')
|
|
self.p.set_role('replica')
|
|
self.p.config.check_recovery_conf = Mock(return_value=(True, False))
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader because i had the session lock')
|
|
|
|
def test_process_healthy_standby_cluster_as_cascade_replica(self):
|
|
self.p.is_primary = false
|
|
self.p.name = 'replica'
|
|
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'no action. I am (replica), a secondary, and following a standby leader (leader)')
|
|
with patch.object(Leader, 'conn_url', PropertyMock(return_value='')):
|
|
self.assertEqual(self.ha.run_cycle(), 'continue following the old known standby leader')
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=True))
|
|
def test_process_unhealthy_standby_cluster_as_standby_leader(self):
|
|
self.p.is_primary = false
|
|
self.p.name = 'leader'
|
|
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
|
self.ha.sysid_valid = true
|
|
self.p._sysid = True
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to a standby leader by acquiring session lock')
|
|
|
|
@patch.object(Rewind, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
|
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
|
def test_process_unhealthy_standby_cluster_as_cascade_replica(self):
|
|
self.p.is_primary = false
|
|
self.p.name = 'replica'
|
|
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
|
self.assertTrue(self.ha.run_cycle().startswith('running pg_rewind from remote_member:'))
|
|
|
|
def test_recover_unhealthy_leader_in_standby_cluster(self):
|
|
self.p.is_primary = false
|
|
self.p.name = 'leader'
|
|
self.p.is_running = false
|
|
self.p.follow = false
|
|
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
|
self.assertEqual(self.ha.run_cycle(), 'starting as a standby leader because i had the session lock')
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=True))
|
|
def test_recover_unhealthy_unlocked_standby_cluster(self):
|
|
self.p.is_primary = false
|
|
self.p.name = 'leader'
|
|
self.p.is_running = false
|
|
self.p.follow = false
|
|
self.ha.cluster = get_standby_cluster_initialized_with_only_leader()
|
|
self.ha.has_lock = false
|
|
self.assertEqual(self.ha.run_cycle(), 'trying to follow a remote member because standby cluster is unhealthy')
|
|
|
|
def test_failed_to_update_lock_in_pause(self):
|
|
self.ha.update_lock = false
|
|
self.ha.is_paused = true
|
|
self.p.name = 'leader'
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'PAUSE: continue to run as primary after failing to update leader lock in DCS')
|
|
|
|
def test_postgres_unhealthy_in_pause(self):
|
|
self.ha.is_paused = true
|
|
self.p.is_healthy = false
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: postgres is not running')
|
|
self.ha.has_lock = true
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: removed leader lock because postgres is not running')
|
|
|
|
def test_no_etcd_connection_in_pause(self):
|
|
self.ha.is_paused = true
|
|
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: DCS is not accessible')
|
|
|
|
@patch('patroni.ha.Ha.update_lock', return_value=True)
|
|
@patch('patroni.ha.Ha.demote')
|
|
def test_starting_timeout(self, demote, update_lock):
|
|
def check_calls(seq):
|
|
for mock, called in seq:
|
|
if called:
|
|
mock.assert_called_once()
|
|
else:
|
|
mock.assert_not_called()
|
|
mock.reset_mock()
|
|
self.ha.has_lock = true
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.p.check_for_startup = true
|
|
self.p.time_in_state = lambda: 30
|
|
self.assertEqual(self.ha.run_cycle(), 'PostgreSQL is still starting up, 270 seconds until timeout')
|
|
check_calls([(update_lock, True), (demote, False)])
|
|
|
|
self.p.time_in_state = lambda: 350
|
|
self.ha.fetch_node_status = get_node_status(reachable=False) # inaccessible, in_recovery
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'primary start has timed out, but continuing to wait because failover is not possible')
|
|
check_calls([(update_lock, True), (demote, False)])
|
|
|
|
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
|
|
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL because of startup timeout')
|
|
check_calls([(update_lock, True), (demote, True)])
|
|
|
|
update_lock.return_value = False
|
|
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL while starting up because leader key was lost')
|
|
check_calls([(update_lock, True), (demote, True)])
|
|
|
|
self.ha.has_lock = false
|
|
self.p.is_primary = false
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'no action. I am (postgresql0), a secondary, and following a leader (leader)')
|
|
check_calls([(update_lock, False), (demote, False)])
|
|
|
|
def test_manual_failover_while_starting(self):
|
|
self.ha.has_lock = true
|
|
self.p.check_for_startup = true
|
|
f = Failover(0, self.p.name, '', None)
|
|
self.ha.cluster = get_cluster_initialized_with_leader(f)
|
|
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
|
|
self.assertEqual(self.ha.run_cycle(), 'switchover: demoting myself')
|
|
|
|
@patch('patroni.ha.Ha.demote')
|
|
def test_failover_immediately_on_zero_primary_start_timeout(self, demote):
|
|
self.p.is_running = false
|
|
self.ha.cluster = get_cluster_initialized_with_leader(sync=(self.p.name, 'other'))
|
|
self.ha.cluster.config.data.update({'synchronous_mode': True, 'primary_start_timeout': 0})
|
|
self.ha.has_lock = true
|
|
self.ha.update_lock = true
|
|
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
|
|
self.assertEqual(self.ha.run_cycle(), 'stopped PostgreSQL to fail over after a crash')
|
|
demote.assert_called_once()
|
|
|
|
def test_primary_stop_timeout(self):
|
|
self.assertEqual(self.ha.primary_stop_timeout(), None)
|
|
self.ha.cluster.config.data.update({'primary_stop_timeout': 30})
|
|
global_config.update(self.ha.cluster)
|
|
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
|
|
self.assertEqual(self.ha.primary_stop_timeout(), 30)
|
|
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=False)):
|
|
self.assertEqual(self.ha.primary_stop_timeout(), None)
|
|
self.ha.cluster.config.data['primary_stop_timeout'] = None
|
|
global_config.update(self.ha.cluster)
|
|
self.assertEqual(self.ha.primary_stop_timeout(), None)
|
|
|
|
@patch('patroni.postgresql.Postgresql.follow')
|
|
def test_demote_immediate(self, follow):
|
|
self.ha.has_lock = true
|
|
self.e.get_cluster = Mock(return_value=get_cluster_initialized_without_leader())
|
|
self.ha.demote('immediate')
|
|
follow.assert_called_once_with(None)
|
|
|
|
def test__process_multisync_replication(self):
|
|
self.ha.has_lock = true
|
|
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
|
|
mock_cfg_set_sync = self.p.config.set_synchronous_standby_names = Mock()
|
|
self.p.name = 'leader'
|
|
|
|
# Test sync key removed when sync mode disabled
|
|
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
|
|
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
|
|
self.ha.run_cycle()
|
|
mock_delete_sync.assert_called_once()
|
|
mock_set_sync.assert_called_once_with(CaseInsensitiveSet())
|
|
mock_cfg_set_sync.assert_called_once()
|
|
|
|
mock_set_sync.reset_mock()
|
|
mock_cfg_set_sync.reset_mock()
|
|
# Test sync key not touched when not there
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
with patch.object(self.ha.dcs, 'delete_sync_state') as mock_delete_sync:
|
|
self.ha.run_cycle()
|
|
mock_delete_sync.assert_not_called()
|
|
mock_set_sync.assert_not_called()
|
|
mock_cfg_set_sync.assert_called_once()
|
|
|
|
mock_set_sync.reset_mock()
|
|
mock_cfg_set_sync.reset_mock()
|
|
|
|
self.ha.is_synchronous_mode = true
|
|
|
|
# Test sync standby not touched when picking the same node
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
|
|
CaseInsensitiveSet(['other']),
|
|
CaseInsensitiveSet(['other'])))
|
|
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
|
|
self.ha.run_cycle()
|
|
mock_set_sync.assert_not_called()
|
|
|
|
mock_set_sync.reset_mock()
|
|
mock_cfg_set_sync.reset_mock()
|
|
|
|
# Test sync standby is replaced when switching standbys
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
|
|
CaseInsensitiveSet(['other2'])))
|
|
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
self.ha.run_cycle()
|
|
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(['other2']))
|
|
mock_cfg_set_sync.assert_not_called()
|
|
|
|
# Test sync standby is replaced when new standby is joined
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
|
|
CaseInsensitiveSet(['other2']),
|
|
CaseInsensitiveSet(['other2', 'other3'])))
|
|
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
self.ha.run_cycle()
|
|
self.assertEqual(mock_set_sync.call_args_list[0][0], (CaseInsensitiveSet(['other2']),))
|
|
self.assertEqual(mock_set_sync.call_args_list[1][0], (CaseInsensitiveSet(['other2', 'other3']),))
|
|
mock_cfg_set_sync.assert_not_called()
|
|
|
|
mock_set_sync.reset_mock()
|
|
mock_cfg_set_sync.reset_mock()
|
|
# Test sync standby is not disabled when updating dcs fails
|
|
self.ha.dcs.write_sync_state = Mock(return_value=None)
|
|
self.ha.run_cycle()
|
|
mock_set_sync.assert_not_called()
|
|
mock_cfg_set_sync.assert_not_called()
|
|
|
|
mock_set_sync.reset_mock()
|
|
mock_cfg_set_sync.reset_mock()
|
|
# Test changing sync standby
|
|
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other')))
|
|
# self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
|
|
CaseInsensitiveSet(['other2']),
|
|
CaseInsensitiveSet(['other2'])))
|
|
self.ha.run_cycle()
|
|
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
|
|
|
|
# Test updating sync standby key failed due to race
|
|
self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState.empty(), None])
|
|
self.ha.run_cycle()
|
|
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
|
|
|
|
# Test updating sync standby key failed due to DCS being not accessible
|
|
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
self.ha.dcs.get_cluster = Mock(side_effect=DCSError('foo'))
|
|
self.ha.run_cycle()
|
|
|
|
# Test changing sync standby failed due to race
|
|
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('somebodyelse', None)))
|
|
self.ha.run_cycle()
|
|
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
|
|
|
|
# Test sync set to '*' when synchronous_mode_strict is enabled
|
|
mock_set_sync.reset_mock()
|
|
mock_cfg_set_sync.reset_mock()
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
|
|
CaseInsensitiveSet()))
|
|
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
|
|
self.ha.run_cycle()
|
|
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
|
|
mock_cfg_set_sync.assert_not_called()
|
|
|
|
# Test the value configured by the user for synchronous_standby_names is used when synchronous mode is disabled
|
|
self.ha.is_synchronous_mode = false
|
|
|
|
mock_set_sync.reset_mock()
|
|
mock_cfg_set_sync.reset_mock()
|
|
ssn_mock = PropertyMock(return_value="SOME_SSN")
|
|
with patch('patroni.postgresql.config.ConfigHandler.synchronous_standby_names', ssn_mock):
|
|
self.ha.run_cycle()
|
|
mock_set_sync.assert_not_called()
|
|
mock_cfg_set_sync.assert_called_once_with("SOME_SSN")
|
|
|
|
def test_sync_replication_become_primary(self):
|
|
self.ha.is_synchronous_mode = true
|
|
|
|
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
|
|
self.p.is_primary = false
|
|
self.p.set_role('replica')
|
|
self.ha.has_lock = true
|
|
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
self.p.name = 'leader'
|
|
self.ha.cluster = get_cluster_initialized_with_leader(sync=('other', None))
|
|
|
|
# When we just became primary nobody is sync
|
|
self.assertEqual(self.ha.enforce_primary_role('msg', 'promote msg'), 'promote msg')
|
|
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(), 0)
|
|
mock_write_sync.assert_called_once_with('leader', None, 0, version=0)
|
|
|
|
mock_set_sync.reset_mock()
|
|
|
|
# When we just became primary nobody is sync
|
|
self.p.set_role('replica')
|
|
mock_write_sync.return_value = False
|
|
self.assertTrue(self.ha.enforce_primary_role('msg', 'promote msg') != 'promote msg')
|
|
mock_set_sync.assert_not_called()
|
|
|
|
def test_unhealthy_sync_mode(self):
|
|
self.ha.is_synchronous_mode = true
|
|
|
|
self.p.is_primary = false
|
|
self.p.set_role('replica')
|
|
self.p.name = 'other'
|
|
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other2'))
|
|
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
mock_acquire = self.ha.acquire_lock = Mock(return_value=True)
|
|
mock_follow = self.p.follow = Mock()
|
|
mock_promote = self.p.promote = Mock()
|
|
|
|
# If we don't match the sync replica we are not allowed to acquire lock
|
|
self.ha.run_cycle()
|
|
mock_acquire.assert_not_called()
|
|
mock_follow.assert_called_once()
|
|
self.assertEqual(mock_follow.call_args[0][0], None)
|
|
mock_write_sync.assert_not_called()
|
|
|
|
mock_follow.reset_mock()
|
|
# If we do match we will try to promote
|
|
self.ha._is_healthiest_node = true
|
|
|
|
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'other'))
|
|
self.ha.run_cycle()
|
|
mock_acquire.assert_called_once()
|
|
mock_follow.assert_not_called()
|
|
mock_promote.assert_called_once()
|
|
mock_write_sync.assert_called_once_with('other', None, 0, version=0)
|
|
|
|
def test_disable_sync_when_restarting(self):
|
|
self.ha.is_synchronous_mode = true
|
|
|
|
self.p.name = 'other'
|
|
self.p.is_primary = false
|
|
self.p.set_role('replica')
|
|
mock_restart = self.p.restart = Mock(return_value=True)
|
|
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
|
|
self.ha.touch_member = Mock(return_value=True)
|
|
self.ha.dcs.get_cluster = Mock(side_effect=[
|
|
get_cluster_initialized_with_leader(sync=('leader', syncstandby))
|
|
for syncstandby in ['other', None]])
|
|
|
|
with patch('time.sleep') as mock_sleep:
|
|
self.ha.restart({})
|
|
mock_restart.assert_called_once()
|
|
mock_sleep.assert_called()
|
|
|
|
# Restart is still called when DCS connection fails
|
|
mock_restart.reset_mock()
|
|
self.ha.dcs.get_cluster = Mock(side_effect=DCSError("foo"))
|
|
self.ha.restart({})
|
|
|
|
mock_restart.assert_called_once()
|
|
|
|
# We don't try to fetch the cluster state when touch_member fails
|
|
mock_restart.reset_mock()
|
|
self.ha.dcs.get_cluster.reset_mock()
|
|
self.ha.touch_member = Mock(return_value=False)
|
|
|
|
self.ha.restart({})
|
|
|
|
mock_restart.assert_called_once()
|
|
self.ha.dcs.get_cluster.assert_not_called()
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_enable_synchronous_mode(self):
|
|
self.ha.is_synchronous_mode = true
|
|
self.ha.has_lock = true
|
|
self.p.name = 'leader'
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
|
|
CaseInsensitiveSet(), CaseInsensitiveSet()))
|
|
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
with patch('patroni.ha.logger.info') as mock_logger:
|
|
self.ha.run_cycle()
|
|
self.assertEqual(mock_logger.call_args_list[0][0][0], 'Enabled synchronous replication')
|
|
self.ha.dcs.write_sync_state = Mock(return_value=None)
|
|
with patch('patroni.ha.logger.warning') as mock_logger:
|
|
self.ha.run_cycle()
|
|
self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed')
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_inconsistent_synchronous_state(self):
|
|
self.ha.is_synchronous_mode = true
|
|
self.ha.has_lock = true
|
|
self.p.name = 'leader'
|
|
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a'))
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
|
|
CaseInsensitiveSet(), CaseInsensitiveSet('a')))
|
|
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
|
|
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
|
|
with patch('patroni.ha.logger.warning') as mock_logger:
|
|
self.ha.run_cycle()
|
|
mock_set_sync.assert_called_once()
|
|
self.assertTrue(mock_logger.call_args_list[0][0][0].startswith('Inconsistent state between '))
|
|
self.ha.dcs.write_sync_state = Mock(return_value=None)
|
|
with patch('patroni.ha.logger.warning') as mock_logger:
|
|
self.ha.run_cycle()
|
|
self.assertEqual(mock_logger.call_args[0][0], 'Updating sync state failed')
|
|
|
|
def test_effective_tags(self):
|
|
self.ha._disable_sync = True
|
|
self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar', 'nosync': True, 'sync_priority': 0})
|
|
self.ha._disable_sync = False
|
|
self.assertEqual(self.ha.get_effective_tags(), {'foo': 'bar'})
|
|
|
|
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
|
|
@patch('builtins.open', Mock(side_effect=Exception))
|
|
def test_restore_cluster_config(self):
|
|
self.ha.cluster.config.data.clear()
|
|
self.ha.has_lock = true
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
|
|
def test_watch(self):
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.ha.watch(0)
|
|
|
|
def test_wakeup(self):
|
|
self.ha.wakeup()
|
|
|
|
def test_shutdown(self):
|
|
self.p.is_running = false
|
|
self.ha.is_leader = true
|
|
|
|
def stop(*args, **kwargs):
|
|
kwargs['on_shutdown'](123, 120)
|
|
|
|
self.p.stop = stop
|
|
self.ha.shutdown()
|
|
|
|
self.ha.is_failover_possible = true
|
|
self.ha.shutdown()
|
|
|
|
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
|
|
def test_shutdown_citus_worker(self):
|
|
self.ha.is_leader = true
|
|
self.p.is_running = Mock(side_effect=[Mock(), False])
|
|
self.ha.patroni.request = Mock()
|
|
self.ha.shutdown()
|
|
self.ha.patroni.request.assert_called()
|
|
self.assertEqual(self.ha.patroni.request.call_args[0][2], 'citus')
|
|
self.assertEqual(self.ha.patroni.request.call_args[0][3]['type'], 'before_demote')
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test_leader_with_not_accessible_data_directory(self):
|
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
|
self.ha.has_lock = true
|
|
self.p.data_directory_empty = Mock(side_effect=OSError(5, "Input/output error: '{}'".format(self.p.data_dir)))
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'released leader key voluntarily as data dir not accessible and currently leader')
|
|
self.assertEqual(self.p.role, 'uninitialized')
|
|
|
|
# as has_lock is mocked out, we need to fake the leader key release
|
|
self.ha.has_lock = false
|
|
# will not say bootstrap because data directory is not accessible
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
"data directory is not accessible: [Errno 5] Input/output error: '{}'".format(self.p.data_dir))
|
|
|
|
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
|
|
@patch('builtins.open', mock_open(read_data=('1\t0/40159C0\tno recovery target specified\n\n'
|
|
'2\t1/40159C0\tno recovery target specified\n')))
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_update_cluster_history(self):
|
|
self.ha.has_lock = true
|
|
for tl in (1, 3):
|
|
self.p.get_primary_timeline = Mock(return_value=tl)
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
|
|
@patch('sys.exit', return_value=1)
|
|
def test_abort_join(self, exit_mock):
|
|
self.ha.cluster = get_cluster_not_initialized_without_leader()
|
|
self.p.is_primary = false
|
|
self.ha.run_cycle()
|
|
exit_mock.assert_called_once_with(1)
|
|
self.p.set_role('replica')
|
|
self.ha.dcs.initialize = Mock()
|
|
with patch.object(Postgresql, 'cb_called', PropertyMock(return_value=True)):
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
self.ha.dcs.initialize.assert_not_called()
|
|
|
|
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
|
def test_after_pause(self):
|
|
self.ha.has_lock = true
|
|
self.ha.is_paused = true
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. I am (postgresql0), the leader with the lock')
|
|
self.ha.is_paused = false
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
|
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
def test_permanent_logical_slots_after_promote(self):
|
|
self.p._major_version = 110000
|
|
config = ClusterConfig(1, {'slots': {'l': {'database': 'postgres', 'plugin': 'test_decoding'}}}, 1)
|
|
self.p.name = 'other'
|
|
self.ha.cluster = get_cluster_initialized_without_leader(cluster_config=config)
|
|
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
|
|
self.ha.cluster = get_cluster_initialized_without_leader(leader=True, cluster_config=config)
|
|
self.ha.has_lock = true
|
|
self.assertEqual(self.ha.run_cycle(), 'no action. I am (other), the leader with the lock')
|
|
|
|
@patch.object(Cluster, 'has_member', true)
|
|
def test_run_cycle(self):
|
|
self.ha.dcs.touch_member = Mock(side_effect=DCSError('foo'))
|
|
self.assertEqual(self.ha.run_cycle(), 'Unexpected exception raised, please report it as a BUG')
|
|
self.ha.dcs.touch_member = Mock(side_effect=PatroniFatalException('foo'))
|
|
self.assertRaises(PatroniFatalException, self.ha.run_cycle)
|
|
|
|
def test_empty_directory_in_pause(self):
|
|
self.ha.is_paused = true
|
|
self.p.data_directory_empty = true
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: running with empty data directory')
|
|
self.assertEqual(self.p.role, 'uninitialized')
|
|
|
|
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
|
|
def test_sysid_no_match_in_pause(self):
|
|
self.ha.is_paused = true
|
|
self.p.controldata = lambda: {'Database cluster state': 'in recovery', 'Database system identifier': '123'}
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: continue to run as primary without lock')
|
|
|
|
self.ha.has_lock = true
|
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: released leader key voluntarily due to the system ID mismatch')
|
|
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
@patch('os.path.exists', Mock(return_value=True))
|
|
@patch('shutil.rmtree', Mock())
|
|
@patch('os.makedirs', Mock())
|
|
@patch('os.open', Mock())
|
|
@patch('os.fsync', Mock())
|
|
@patch('os.close', Mock())
|
|
@patch('os.chmod', Mock())
|
|
@patch('os.rename', Mock())
|
|
@patch('patroni.postgresql.Postgresql.is_starting', Mock(return_value=False))
|
|
@patch('builtins.open', mock_open())
|
|
@patch.object(ConfigHandler, 'check_recovery_conf', Mock(return_value=(False, False)))
|
|
@patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000))
|
|
@patch.object(SlotsHandler, 'sync_replication_slots', Mock(return_value=['ls']))
|
|
def test_follow_copy(self):
|
|
self.ha.cluster.config.data['slots'] = {'ls': {'database': 'a', 'plugin': 'b'}}
|
|
self.p.is_primary = false
|
|
self.assertTrue(self.ha.run_cycle().startswith('Copying logical slots'))
|
|
|
|
def test_acquire_lock(self):
|
|
self.ha.dcs.attempt_to_acquire_leader = Mock(side_effect=[DCSError('foo'), Exception])
|
|
self.assertRaises(DCSError, self.ha.acquire_lock)
|
|
self.assertFalse(self.ha.acquire_lock())
|
|
|
|
@patch('patroni.postgresql.mpp.AbstractMPPHandler.is_coordinator', Mock(return_value=False))
|
|
def test_notify_citus_coordinator(self):
|
|
self.ha.patroni.request = Mock()
|
|
self.ha.notify_mpp_coordinator('before_demote')
|
|
self.ha.patroni.request.assert_called_once()
|
|
self.assertEqual(self.ha.patroni.request.call_args[1]['timeout'], 30)
|
|
self.ha.patroni.request = Mock(side_effect=Exception)
|
|
with patch('patroni.ha.logger.warning') as mock_logger:
|
|
self.ha.notify_mpp_coordinator('before_promote')
|
|
self.assertEqual(self.ha.patroni.request.call_args[1]['timeout'], 2)
|
|
mock_logger.assert_called()
|
|
self.assertTrue(mock_logger.call_args[0][0].startswith('Request to %s coordinator leader'))
|
|
self.assertEqual(mock_logger.call_args[0][1], 'Citus')
|
|
|
|
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
|
|
@patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True))
|
|
def test_process_sync_replication_prepromote(self):
|
|
self.p._major_version = 90500
|
|
self.ha.cluster = get_cluster_initialized_without_leader(sync=('other', self.p.name + ',foo'))
|
|
self.p.is_primary = false
|
|
self.p.set_role('replica')
|
|
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
|
|
# Postgres 9.5, write_sync_state to DCS failed
|
|
self.assertEqual(self.ha.run_cycle(),
|
|
'Postponing promotion because synchronous replication state was updated by somebody else')
|
|
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1)
|
|
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
|
|
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
|
|
|
|
mock_set_sync = self.p.config.set_synchronous_standby_names = Mock()
|
|
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=True)
|
|
# Postgres 9.5, our name is written to leader of the /sync key, while voters list and ssn is empty
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1)
|
|
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
|
|
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
|
|
self.assertEqual(mock_set_sync.call_count, 1)
|
|
self.assertEqual(mock_set_sync.call_args_list[0][0], (None,))
|
|
|
|
self.p._major_version = 90600
|
|
mock_set_sync.reset_mock()
|
|
mock_write_sync.reset_mock()
|
|
self.p.set_role('replica')
|
|
# Postgres 9.6, with quorum commit we avoid updating /sync key and put some nodes to ssn
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
self.assertEqual(mock_write_sync.call_count, 0)
|
|
self.assertEqual(mock_set_sync.call_count, 1)
|
|
self.assertEqual(mock_set_sync.call_args_list[0][0], ('2 (foo,other)',))
|
|
|
|
self.p._major_version = 150000
|
|
mock_set_sync.reset_mock()
|
|
self.p.set_role('replica')
|
|
self.p.name = 'nonsync'
|
|
self.ha.fetch_node_status = get_node_status()
|
|
# Postgres 15, with quorum commit. Non-sync node promoted we avoid updating /sync key and put some nodes to ssn
|
|
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
|
self.assertEqual(mock_write_sync.call_count, 0)
|
|
self.assertEqual(mock_set_sync.call_count, 1)
|
|
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 3 (foo,other,postgresql0)',))
|
|
|
|
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
|
|
@patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True))
|
|
def test__process_quorum_replication(self):
|
|
self.p._major_version = 150000
|
|
self.ha.has_lock = true
|
|
mock_set_sync = self.p.config.set_synchronous_standby_names = Mock()
|
|
self.p.name = 'leader'
|
|
|
|
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
|
|
# Test /sync key is attempted to set and failed when missing or invalid
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 1, CaseInsensitiveSet(['other']),
|
|
CaseInsensitiveSet(['other'])))
|
|
self.ha.run_cycle()
|
|
self.assertEqual(mock_write_sync.call_count, 1)
|
|
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
|
|
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None})
|
|
self.assertEqual(mock_set_sync.call_count, 0)
|
|
|
|
self.ha._promote_timestamp = 1
|
|
mock_write_sync = self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState(None, self.p.name, None, 0), None])
|
|
# Test /sync key is attempted to set and succeed when missing or invalid
|
|
with patch.object(SyncState, 'is_empty', Mock(side_effect=[True, False])):
|
|
self.ha.run_cycle()
|
|
self.assertEqual(mock_write_sync.call_count, 2)
|
|
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
|
|
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None})
|
|
self.assertEqual(mock_write_sync.call_args_list[1][0], (self.p.name, CaseInsensitiveSet(['other']), 0))
|
|
self.assertEqual(mock_write_sync.call_args_list[1][1], {'version': None})
|
|
self.assertEqual(mock_set_sync.call_count, 0)
|
|
|
|
self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, 0, CaseInsensitiveSet(['foo']),
|
|
CaseInsensitiveSet(['other'])),
|
|
_SyncState('quorum', 1, 1, CaseInsensitiveSet(['foo']),
|
|
CaseInsensitiveSet(['foo']))])
|
|
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState(1, 'leader', 'foo', 0))
|
|
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo'))
|
|
# Test the sync node is removed from voters, added to ssn
|
|
with patch.object(Postgresql, 'synchronous_standby_names', Mock(return_value='other')), \
|
|
patch('time.sleep', Mock()):
|
|
self.ha.run_cycle()
|
|
self.assertEqual(mock_write_sync.call_count, 1)
|
|
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0))
|
|
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
|
|
self.assertEqual(mock_set_sync.call_count, 1)
|
|
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (other)',))
|
|
|
|
# Test ANY 1 (*) when synchronous_mode_strict and no nodes available
|
|
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 0,
|
|
CaseInsensitiveSet(['other', 'foo']),
|
|
CaseInsensitiveSet()))
|
|
mock_write_sync.reset_mock()
|
|
mock_set_sync.reset_mock()
|
|
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
|
|
self.ha.run_cycle()
|
|
self.assertEqual(mock_write_sync.call_count, 1)
|
|
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0))
|
|
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
|
|
self.assertEqual(mock_set_sync.call_count, 1)
|
|
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (*)',))
|
|
|
|
# Test that _process_quorum_replication doesn't take longer than loop_wait
|
|
with patch('time.time', Mock(side_effect=[30, 60, 90, 120])):
|
|
self.ha.process_sync_replication()
|