mirror of
https://github.com/optim-enterprises-bv/patroni.git
synced 2026-01-11 10:25:14 +00:00
1. pg_rewind error log format wasn't verbose 2. it doesn't support specifying num in synchronous_standby_names
345 lines
17 KiB
Python
345 lines
17 KiB
Python
from unittest.mock import Mock, mock_open, patch, PropertyMock
|
|
|
|
from patroni.postgresql import Postgresql
|
|
from patroni.postgresql.cancellable import CancellableSubprocess
|
|
from patroni.postgresql.rewind import Rewind
|
|
|
|
from . import BaseTestPostgresql, MockCursor, psycopg_connect
|
|
|
|
|
|
class MockThread(object):
|
|
|
|
def __init__(self, target, args):
|
|
self._target = target
|
|
self._args = args
|
|
|
|
def start(self):
|
|
self._target(*self._args)
|
|
|
|
|
|
def mock_cancellable_call(*args, **kwargs):
|
|
communicate = kwargs.pop('communicate', None)
|
|
if isinstance(communicate, dict):
|
|
communicate.update(stdout=b'', stderr=b'pg_rewind: error: could not open file '
|
|
+ b'"data/postgresql0/pg_xlog/000000010000000000000003": No such file')
|
|
return 1
|
|
|
|
|
|
def mock_cancellable_call0(*args, **kwargs):
|
|
communicate = kwargs.pop('communicate', None)
|
|
if isinstance(communicate, dict):
|
|
communicate.update(stdout=b'', stderr=b'')
|
|
return 0
|
|
|
|
|
|
def mock_cancellable_call1(*args, **kwargs):
|
|
communicate = kwargs.pop('communicate', None)
|
|
if isinstance(communicate, dict):
|
|
communicate.update(stdout=b'', stderr=b'')
|
|
return 1
|
|
|
|
|
|
def mock_single_user_mode(self, communicate, options):
|
|
communicate['stdout'] = b'foo'
|
|
communicate['stderr'] = b'bar'
|
|
return 1
|
|
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
class TestRewind(BaseTestPostgresql):
|
|
|
|
def setUp(self):
|
|
super(TestRewind, self).setUp()
|
|
self.r = Rewind(self.p)
|
|
|
|
def test_can_rewind(self):
|
|
with patch.object(Postgresql, 'controldata', Mock(return_value={'wal_log_hints setting': 'on'})):
|
|
self.assertTrue(self.r.can_rewind)
|
|
with patch('subprocess.call', Mock(return_value=1)):
|
|
self.assertFalse(self.r.can_rewind)
|
|
with patch('subprocess.call', side_effect=OSError):
|
|
self.assertFalse(self.r.can_rewind)
|
|
self.p.config._config['use_pg_rewind'] = False
|
|
self.assertFalse(self.r.can_rewind)
|
|
|
|
def test_pg_rewind(self):
|
|
r = {'user': '', 'host': '', 'port': '', 'database': '', 'password': ''}
|
|
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=150000)), \
|
|
patch.object(CancellableSubprocess, 'call', Mock(return_value=None)):
|
|
with patch('subprocess.check_output', Mock(return_value=b'boo')):
|
|
self.assertFalse(self.r.pg_rewind(r))
|
|
with patch('subprocess.check_output', Mock(side_effect=Exception)):
|
|
self.assertFalse(self.r.pg_rewind(r))
|
|
|
|
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=120000)), \
|
|
patch('subprocess.check_output', Mock(return_value=b'foo %f %p %r %% % %')):
|
|
with patch.object(CancellableSubprocess, 'call', mock_cancellable_call):
|
|
self.assertFalse(self.r.pg_rewind(r))
|
|
with patch.object(CancellableSubprocess, 'call', mock_cancellable_call0):
|
|
self.assertTrue(self.r.pg_rewind(r))
|
|
with patch.object(CancellableSubprocess, 'call', mock_cancellable_call1):
|
|
self.assertFalse(self.r.pg_rewind(r))
|
|
|
|
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
|
def test__get_local_timeline_lsn(self):
|
|
self.r.trigger_check_diverged_lsn()
|
|
with patch.object(Postgresql, 'controldata',
|
|
Mock(return_value={'Database cluster state': 'shut down in recovery',
|
|
'Minimum recovery ending location': '0/0',
|
|
"Min recovery ending loc's timeline": '0',
|
|
'Latest checkpoint location': '0/'})):
|
|
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
|
|
|
|
with patch.object(Postgresql, 'is_running', Mock(return_value=True)), \
|
|
patch.object(MockCursor, 'fetchone', Mock(side_effect=Exception)), \
|
|
patch.object(MockCursor, 'fetchall',
|
|
Mock(return_value=[(0, 0, 1, 1, 0, 0, 0, 0, 0, None, None, None)])):
|
|
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
|
|
|
|
@patch.object(CancellableSubprocess, 'call', mock_cancellable_call)
|
|
@patch.object(Postgresql, 'get_guc_value', Mock(return_value=''))
|
|
@patch.object(Postgresql, 'checkpoint', side_effect=['', '1'])
|
|
@patch.object(Postgresql, 'stop', Mock(return_value=False))
|
|
@patch.object(Postgresql, 'start', Mock())
|
|
def test_execute(self, mock_checkpoint):
|
|
with patch('patroni.postgresql.rewind.logger.info') as mock_logger:
|
|
self.r.execute(self.leader)
|
|
self.assertEqual(mock_logger.call_args_list[0][0],
|
|
('running pg_rewind from %s', 'dbname=postgres user=foo host=127.0.0.1 port=5435'))
|
|
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000)):
|
|
self.r.execute(self.leader)
|
|
with patch.object(MockCursor, 'fetchone', Mock(side_effect=Exception)):
|
|
self.r.execute(self.leader)
|
|
|
|
with patch.object(Rewind, 'pg_rewind', Mock(return_value=False)):
|
|
mock_checkpoint.side_effect = ['1', '', '', '']
|
|
self.r.execute(self.leader)
|
|
self.r.execute(self.leader)
|
|
with patch.object(Rewind, 'check_leader_is_not_in_recovery', Mock(return_value=False)):
|
|
self.r.execute(self.leader)
|
|
self.p.config._config['remove_data_directory_on_rewind_failure'] = False
|
|
self.r.trigger_check_diverged_lsn()
|
|
self.r.execute(self.leader)
|
|
|
|
self.leader.member.data.update(version='1.5.7', checkpoint_after_promote=False, role='primary')
|
|
self.assertIsNone(self.r.execute(self.leader))
|
|
|
|
del self.leader.member.data['checkpoint_after_promote']
|
|
with patch.object(Rewind, 'check_leader_is_not_in_recovery', Mock(return_value=False)):
|
|
self.assertIsNone(self.r.execute(self.leader))
|
|
|
|
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
|
|
self.r.execute(self.leader)
|
|
|
|
@patch('patroni.postgresql.rewind.logger.info')
|
|
def test__log_primary_history(self, mock_logger):
|
|
history = [[n, n, ''] for n in range(1, 10)]
|
|
self.r._log_primary_history(history, 1)
|
|
expected = '\n'.join(['{0}\t0/{0}\t'.format(n) for n in range(1, 4)] + ['...', '9\t0/9\t'])
|
|
self.assertEqual(mock_logger.call_args[0][1], expected)
|
|
|
|
@patch.object(Postgresql, 'start', Mock())
|
|
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
|
|
@patch.object(Rewind, '_get_local_timeline_lsn')
|
|
@patch.object(Rewind, 'check_leader_is_not_in_recovery')
|
|
def test__check_timeline_and_lsn(self, mock_check_leader_is_not_in_recovery, mock_get_local_timeline_lsn):
|
|
mock_get_local_timeline_lsn.return_value = (True, 2, 67197377)
|
|
mock_check_leader_is_not_in_recovery.return_value = False
|
|
self.r.trigger_check_diverged_lsn()
|
|
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
self.leader = self.leader.member
|
|
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
mock_check_leader_is_not_in_recovery.return_value = True
|
|
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
self.r.trigger_check_diverged_lsn()
|
|
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[('', 3, '0/0'), ('', b'4\t0/40159C0\tn\n')])):
|
|
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
self.r.reset_state()
|
|
self.r.trigger_check_diverged_lsn()
|
|
with patch('patroni.psycopg.connect', Mock(side_effect=Exception)):
|
|
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
self.r.trigger_check_diverged_lsn()
|
|
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[('', 3, '0/0'), ('', b'1\t0/40159C0\tn\n')])):
|
|
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
self.r.reset_state()
|
|
self.r.trigger_check_diverged_lsn()
|
|
with patch.object(MockCursor, 'fetchone', Mock(return_value=('', 1, '0/0'))):
|
|
with patch.object(Rewind, '_get_local_timeline_lsn', Mock(return_value=(True, 1, '0/0'))):
|
|
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
self.r.trigger_check_diverged_lsn()
|
|
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
|
|
self.r.reset_state()
|
|
self.r.trigger_check_diverged_lsn()
|
|
mock_get_local_timeline_lsn.return_value = (False, 2, 67296664)
|
|
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
|
|
with patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value.communicate.return_value = (
|
|
b'0, lsn: 0/040159C1, prev 0/\n',
|
|
b'pg_waldump: fatal: error in WAL record at 0/40159C1: invalid record length at /: wanted 24, got 0\n'
|
|
)
|
|
self.r.reset_state()
|
|
self.r.trigger_check_diverged_lsn()
|
|
mock_get_local_timeline_lsn.return_value = (False, 2, 67197377)
|
|
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
|
|
mock_popen.return_value.communicate.return_value = (
|
|
b'0, lsn: 0/040159C1, prev 0/\n',
|
|
b'pg_waldump: fatal: error in WAL record at 0/40159C1: invalid record '
|
|
b'length at 0/402DD98: expected at least 24, got 0\n'
|
|
)
|
|
self.r.reset_state()
|
|
self.r.trigger_check_diverged_lsn()
|
|
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
|
|
mock_popen.return_value.communicate.return_value = (
|
|
b'0, lsn: 0/040159C1, prev 0/\n',
|
|
b'pg_waldump: fatal: error in WAL record at 0/40159C1: invalid record length at 0/402DD98\n'
|
|
)
|
|
self.r.reset_state()
|
|
self.r.trigger_check_diverged_lsn()
|
|
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
|
|
self.r.reset_state()
|
|
self.r.trigger_check_diverged_lsn()
|
|
mock_popen.side_effect = Exception
|
|
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
|
|
|
|
@patch.object(MockCursor, 'fetchone', Mock(side_effect=[(True,), Exception]))
|
|
def test_check_leader_is_not_in_recovery(self):
|
|
self.r.check_leader_is_not_in_recovery({})
|
|
self.r.check_leader_is_not_in_recovery({})
|
|
|
|
def test_read_postmaster_opts(self):
|
|
m = mock_open(read_data='/usr/lib/postgres/9.6/bin/postgres "-D" "data/postgresql0" \
|
|
"--listen_addresses=127.0.0.1" "--port=5432" "--hot_standby=on" "--wal_level=hot_standby" \
|
|
"--wal_log_hints=on" "--max_wal_senders=5" "--max_replication_slots=5"\n')
|
|
with patch('builtins.open', m):
|
|
data = self.r.read_postmaster_opts()
|
|
self.assertEqual(data['wal_level'], 'hot_standby')
|
|
self.assertEqual(int(data['max_replication_slots']), 5)
|
|
self.assertEqual(data.get('D'), None)
|
|
|
|
m.side_effect = IOError
|
|
data = self.r.read_postmaster_opts()
|
|
self.assertEqual(data, dict())
|
|
|
|
@patch('psutil.Popen')
|
|
def test_single_user_mode(self, subprocess_popen_mock):
|
|
subprocess_popen_mock.return_value.wait.return_value = 0
|
|
subprocess_popen_mock.return_value.communicate.return_value = ('', '')
|
|
self.assertEqual(self.r.single_user_mode({'input': 'CHECKPOINT'}, {'archive_mode': 'on'}), 0)
|
|
|
|
@patch('os.listdir', Mock(side_effect=[OSError, ['a', 'b']]))
|
|
@patch('os.unlink', Mock(side_effect=OSError))
|
|
@patch('os.remove', Mock())
|
|
@patch('os.path.islink', Mock(side_effect=[True, False]))
|
|
@patch('os.path.isfile', Mock(return_value=True))
|
|
def test_cleanup_archive_status(self):
|
|
self.r.cleanup_archive_status()
|
|
self.r.cleanup_archive_status()
|
|
|
|
@patch('os.path.isfile', Mock(return_value=True))
|
|
@patch('shutil.move', Mock(side_effect=OSError))
|
|
@patch('patroni.postgresql.rewind.logger.info')
|
|
def test_archive_ready_wals(self, mock_logger_info):
|
|
with patch('os.listdir', Mock(side_effect=OSError)), \
|
|
patch.object(Postgresql, 'get_guc_value', Mock(side_effect=['on', 'command %f'])):
|
|
self.r._archive_ready_wals()
|
|
mock_logger_info.assert_not_called()
|
|
|
|
# each assert_not_called() calls get_guc_value('archive_mode') + get_guc_value('archive_command')
|
|
get_guc_value_res = [
|
|
'', 'command %f',
|
|
'on', '',
|
|
]
|
|
with patch.object(Postgresql, 'get_guc_value', Mock(side_effect=get_guc_value_res)):
|
|
for _ in range(len(get_guc_value_res) // 2):
|
|
self.r._archive_ready_wals()
|
|
mock_logger_info.assert_not_called()
|
|
|
|
with patch('os.listdir', Mock(return_value=['000000000000000000000000.ready'])):
|
|
# successful archive_command call
|
|
with patch.object(CancellableSubprocess, 'call', Mock(return_value=0)) as mock_subprocess_call:
|
|
get_guc_value_res = [
|
|
'on', 'command %f',
|
|
'always', 'command %f',
|
|
]
|
|
with patch.object(Postgresql, 'get_guc_value', Mock(side_effect=get_guc_value_res)):
|
|
for _ in range(len(get_guc_value_res) // 2):
|
|
self.r._archive_ready_wals()
|
|
mock_logger_info.assert_called_once()
|
|
self.assertEqual(('Trying to archive %s: %s',
|
|
'000000000000000000000000', 'command 000000000000000000000000'),
|
|
mock_logger_info.call_args[0])
|
|
mock_logger_info.reset_mock()
|
|
mock_subprocess_call.assert_called_once()
|
|
self.assertEqual(mock_subprocess_call.call_args[0][0], ['command 000000000000000000000000'])
|
|
self.assertEqual(mock_subprocess_call.call_args[1]['shell'], True)
|
|
mock_subprocess_call.reset_mock()
|
|
|
|
# failed archive_command call
|
|
with patch.object(CancellableSubprocess, 'call', Mock(return_value=1)):
|
|
with patch.object(Postgresql, 'get_guc_value', Mock(side_effect=['on', 'command %f'])):
|
|
self.r._archive_ready_wals()
|
|
self.assertEqual(('Trying to archive %s: %s',
|
|
'000000000000000000000000', 'command 000000000000000000000000'),
|
|
mock_logger_info.call_args_list[0][0])
|
|
self.assertEqual(('Failed to archive WAL segment %s', '000000000000000000000000'),
|
|
mock_logger_info.call_args_list[1][0])
|
|
mock_logger_info.reset_mock()
|
|
|
|
wal_files_to_skip = [
|
|
'000000000000000000000000.done',
|
|
'000000000000000000000001.partial.done',
|
|
'002.ready',
|
|
'U00000000000000000000001.ready',
|
|
]
|
|
with patch('os.listdir', Mock(return_value=wal_files_to_skip)):
|
|
self.r._archive_ready_wals()
|
|
mock_logger_info.assert_not_called()
|
|
|
|
@patch.object(Postgresql, 'major_version', PropertyMock(return_value=100000))
|
|
@patch('os.listdir', Mock(side_effect=[OSError, ['something', 'something_else']]))
|
|
@patch('shutil.rmtree', Mock())
|
|
@patch('patroni.postgresql.rewind.fsync_dir', Mock())
|
|
@patch('patroni.postgresql.rewind.logger.warning')
|
|
def test_maybe_clean_pg_replslot(self, mock_logger):
|
|
# failed to list pg_replslot/
|
|
self.assertIsNone(self.r._maybe_clean_pg_replslot())
|
|
mock_logger.assert_called_once()
|
|
mock_logger.reset_mock()
|
|
|
|
self.assertIsNone(self.r._maybe_clean_pg_replslot())
|
|
|
|
@patch('os.unlink', Mock())
|
|
@patch('os.listdir', Mock(return_value=[]))
|
|
@patch('os.path.isfile', Mock(return_value=True))
|
|
@patch.object(Rewind, 'read_postmaster_opts', Mock(return_value={}))
|
|
@patch.object(Rewind, 'single_user_mode', mock_single_user_mode)
|
|
def test_ensure_clean_shutdown(self):
|
|
self.assertIsNone(self.r.ensure_clean_shutdown())
|
|
|
|
@patch('patroni.postgresql.rewind.Thread', MockThread)
|
|
@patch.object(Postgresql, 'controldata')
|
|
@patch.object(Postgresql, 'checkpoint')
|
|
@patch.object(Postgresql, 'get_primary_timeline')
|
|
def test_ensure_checkpoint_after_promote(self, mock_get_primary_timeline, mock_checkpoint, mock_controldata):
|
|
mock_controldata.return_value = {"Latest checkpoint's TimeLineID": 1}
|
|
mock_get_primary_timeline.return_value = 1
|
|
self.r.ensure_checkpoint_after_promote(Mock())
|
|
|
|
self.r.reset_state()
|
|
mock_get_primary_timeline.return_value = 2
|
|
mock_checkpoint.return_value = 0
|
|
self.r.ensure_checkpoint_after_promote(Mock())
|
|
self.r.ensure_checkpoint_after_promote(Mock())
|
|
|
|
self.r.reset_state()
|
|
|
|
mock_controldata.side_effect = TypeError
|
|
mock_checkpoint.side_effect = Exception
|
|
self.r.ensure_checkpoint_after_promote(Mock())
|