Files
patroni/tests/test_rewind.py
Alexander Kukushkin 38aef484e8 Fix a few little issues with 9.5 support (#3260)
1. pg_rewind error log format wasn't verbose
2. it doesn't support specifying num in synchronous_standby_names
2025-01-31 16:46:07 +01:00

345 lines
17 KiB
Python

from unittest.mock import Mock, mock_open, patch, PropertyMock
from patroni.postgresql import Postgresql
from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.rewind import Rewind
from . import BaseTestPostgresql, MockCursor, psycopg_connect
class MockThread(object):
def __init__(self, target, args):
self._target = target
self._args = args
def start(self):
self._target(*self._args)
def mock_cancellable_call(*args, **kwargs):
communicate = kwargs.pop('communicate', None)
if isinstance(communicate, dict):
communicate.update(stdout=b'', stderr=b'pg_rewind: error: could not open file '
+ b'"data/postgresql0/pg_xlog/000000010000000000000003": No such file')
return 1
def mock_cancellable_call0(*args, **kwargs):
communicate = kwargs.pop('communicate', None)
if isinstance(communicate, dict):
communicate.update(stdout=b'', stderr=b'')
return 0
def mock_cancellable_call1(*args, **kwargs):
communicate = kwargs.pop('communicate', None)
if isinstance(communicate, dict):
communicate.update(stdout=b'', stderr=b'')
return 1
def mock_single_user_mode(self, communicate, options):
communicate['stdout'] = b'foo'
communicate['stderr'] = b'bar'
return 1
@patch('subprocess.call', Mock(return_value=0))
@patch('patroni.psycopg.connect', psycopg_connect)
class TestRewind(BaseTestPostgresql):
def setUp(self):
super(TestRewind, self).setUp()
self.r = Rewind(self.p)
def test_can_rewind(self):
with patch.object(Postgresql, 'controldata', Mock(return_value={'wal_log_hints setting': 'on'})):
self.assertTrue(self.r.can_rewind)
with patch('subprocess.call', Mock(return_value=1)):
self.assertFalse(self.r.can_rewind)
with patch('subprocess.call', side_effect=OSError):
self.assertFalse(self.r.can_rewind)
self.p.config._config['use_pg_rewind'] = False
self.assertFalse(self.r.can_rewind)
def test_pg_rewind(self):
r = {'user': '', 'host': '', 'port': '', 'database': '', 'password': ''}
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=150000)), \
patch.object(CancellableSubprocess, 'call', Mock(return_value=None)):
with patch('subprocess.check_output', Mock(return_value=b'boo')):
self.assertFalse(self.r.pg_rewind(r))
with patch('subprocess.check_output', Mock(side_effect=Exception)):
self.assertFalse(self.r.pg_rewind(r))
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=120000)), \
patch('subprocess.check_output', Mock(return_value=b'foo %f %p %r %% % %')):
with patch.object(CancellableSubprocess, 'call', mock_cancellable_call):
self.assertFalse(self.r.pg_rewind(r))
with patch.object(CancellableSubprocess, 'call', mock_cancellable_call0):
self.assertTrue(self.r.pg_rewind(r))
with patch.object(CancellableSubprocess, 'call', mock_cancellable_call1):
self.assertFalse(self.r.pg_rewind(r))
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
def test__get_local_timeline_lsn(self):
self.r.trigger_check_diverged_lsn()
with patch.object(Postgresql, 'controldata',
Mock(return_value={'Database cluster state': 'shut down in recovery',
'Minimum recovery ending location': '0/0',
"Min recovery ending loc's timeline": '0',
'Latest checkpoint location': '0/'})):
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
with patch.object(Postgresql, 'is_running', Mock(return_value=True)), \
patch.object(MockCursor, 'fetchone', Mock(side_effect=Exception)), \
patch.object(MockCursor, 'fetchall',
Mock(return_value=[(0, 0, 1, 1, 0, 0, 0, 0, 0, None, None, None)])):
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
@patch.object(CancellableSubprocess, 'call', mock_cancellable_call)
@patch.object(Postgresql, 'get_guc_value', Mock(return_value=''))
@patch.object(Postgresql, 'checkpoint', side_effect=['', '1'])
@patch.object(Postgresql, 'stop', Mock(return_value=False))
@patch.object(Postgresql, 'start', Mock())
def test_execute(self, mock_checkpoint):
with patch('patroni.postgresql.rewind.logger.info') as mock_logger:
self.r.execute(self.leader)
self.assertEqual(mock_logger.call_args_list[0][0],
('running pg_rewind from %s', 'dbname=postgres user=foo host=127.0.0.1 port=5435'))
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=130000)):
self.r.execute(self.leader)
with patch.object(MockCursor, 'fetchone', Mock(side_effect=Exception)):
self.r.execute(self.leader)
with patch.object(Rewind, 'pg_rewind', Mock(return_value=False)):
mock_checkpoint.side_effect = ['1', '', '', '']
self.r.execute(self.leader)
self.r.execute(self.leader)
with patch.object(Rewind, 'check_leader_is_not_in_recovery', Mock(return_value=False)):
self.r.execute(self.leader)
self.p.config._config['remove_data_directory_on_rewind_failure'] = False
self.r.trigger_check_diverged_lsn()
self.r.execute(self.leader)
self.leader.member.data.update(version='1.5.7', checkpoint_after_promote=False, role='primary')
self.assertIsNone(self.r.execute(self.leader))
del self.leader.member.data['checkpoint_after_promote']
with patch.object(Rewind, 'check_leader_is_not_in_recovery', Mock(return_value=False)):
self.assertIsNone(self.r.execute(self.leader))
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
self.r.execute(self.leader)
@patch('patroni.postgresql.rewind.logger.info')
def test__log_primary_history(self, mock_logger):
history = [[n, n, ''] for n in range(1, 10)]
self.r._log_primary_history(history, 1)
expected = '\n'.join(['{0}\t0/{0}\t'.format(n) for n in range(1, 4)] + ['...', '9\t0/9\t'])
self.assertEqual(mock_logger.call_args[0][1], expected)
@patch.object(Postgresql, 'start', Mock())
@patch.object(Rewind, 'can_rewind', PropertyMock(return_value=True))
@patch.object(Rewind, '_get_local_timeline_lsn')
@patch.object(Rewind, 'check_leader_is_not_in_recovery')
def test__check_timeline_and_lsn(self, mock_check_leader_is_not_in_recovery, mock_get_local_timeline_lsn):
mock_get_local_timeline_lsn.return_value = (True, 2, 67197377)
mock_check_leader_is_not_in_recovery.return_value = False
self.r.trigger_check_diverged_lsn()
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.leader = self.leader.member
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
mock_check_leader_is_not_in_recovery.return_value = True
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.trigger_check_diverged_lsn()
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[('', 3, '0/0'), ('', b'4\t0/40159C0\tn\n')])):
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.reset_state()
self.r.trigger_check_diverged_lsn()
with patch('patroni.psycopg.connect', Mock(side_effect=Exception)):
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.trigger_check_diverged_lsn()
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[('', 3, '0/0'), ('', b'1\t0/40159C0\tn\n')])):
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.reset_state()
self.r.trigger_check_diverged_lsn()
with patch.object(MockCursor, 'fetchone', Mock(return_value=('', 1, '0/0'))):
with patch.object(Rewind, '_get_local_timeline_lsn', Mock(return_value=(True, 1, '0/0'))):
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.trigger_check_diverged_lsn()
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.reset_state()
self.r.trigger_check_diverged_lsn()
mock_get_local_timeline_lsn.return_value = (False, 2, 67296664)
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
with patch('subprocess.Popen') as mock_popen:
mock_popen.return_value.communicate.return_value = (
b'0, lsn: 0/040159C1, prev 0/\n',
b'pg_waldump: fatal: error in WAL record at 0/40159C1: invalid record length at /: wanted 24, got 0\n'
)
self.r.reset_state()
self.r.trigger_check_diverged_lsn()
mock_get_local_timeline_lsn.return_value = (False, 2, 67197377)
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
mock_popen.return_value.communicate.return_value = (
b'0, lsn: 0/040159C1, prev 0/\n',
b'pg_waldump: fatal: error in WAL record at 0/40159C1: invalid record '
b'length at 0/402DD98: expected at least 24, got 0\n'
)
self.r.reset_state()
self.r.trigger_check_diverged_lsn()
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
mock_popen.return_value.communicate.return_value = (
b'0, lsn: 0/040159C1, prev 0/\n',
b'pg_waldump: fatal: error in WAL record at 0/40159C1: invalid record length at 0/402DD98\n'
)
self.r.reset_state()
self.r.trigger_check_diverged_lsn()
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.reset_state()
self.r.trigger_check_diverged_lsn()
mock_popen.side_effect = Exception
self.assertTrue(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
@patch.object(MockCursor, 'fetchone', Mock(side_effect=[(True,), Exception]))
def test_check_leader_is_not_in_recovery(self):
self.r.check_leader_is_not_in_recovery({})
self.r.check_leader_is_not_in_recovery({})
def test_read_postmaster_opts(self):
m = mock_open(read_data='/usr/lib/postgres/9.6/bin/postgres "-D" "data/postgresql0" \
"--listen_addresses=127.0.0.1" "--port=5432" "--hot_standby=on" "--wal_level=hot_standby" \
"--wal_log_hints=on" "--max_wal_senders=5" "--max_replication_slots=5"\n')
with patch('builtins.open', m):
data = self.r.read_postmaster_opts()
self.assertEqual(data['wal_level'], 'hot_standby')
self.assertEqual(int(data['max_replication_slots']), 5)
self.assertEqual(data.get('D'), None)
m.side_effect = IOError
data = self.r.read_postmaster_opts()
self.assertEqual(data, dict())
@patch('psutil.Popen')
def test_single_user_mode(self, subprocess_popen_mock):
subprocess_popen_mock.return_value.wait.return_value = 0
subprocess_popen_mock.return_value.communicate.return_value = ('', '')
self.assertEqual(self.r.single_user_mode({'input': 'CHECKPOINT'}, {'archive_mode': 'on'}), 0)
@patch('os.listdir', Mock(side_effect=[OSError, ['a', 'b']]))
@patch('os.unlink', Mock(side_effect=OSError))
@patch('os.remove', Mock())
@patch('os.path.islink', Mock(side_effect=[True, False]))
@patch('os.path.isfile', Mock(return_value=True))
def test_cleanup_archive_status(self):
self.r.cleanup_archive_status()
self.r.cleanup_archive_status()
@patch('os.path.isfile', Mock(return_value=True))
@patch('shutil.move', Mock(side_effect=OSError))
@patch('patroni.postgresql.rewind.logger.info')
def test_archive_ready_wals(self, mock_logger_info):
with patch('os.listdir', Mock(side_effect=OSError)), \
patch.object(Postgresql, 'get_guc_value', Mock(side_effect=['on', 'command %f'])):
self.r._archive_ready_wals()
mock_logger_info.assert_not_called()
# each assert_not_called() calls get_guc_value('archive_mode') + get_guc_value('archive_command')
get_guc_value_res = [
'', 'command %f',
'on', '',
]
with patch.object(Postgresql, 'get_guc_value', Mock(side_effect=get_guc_value_res)):
for _ in range(len(get_guc_value_res) // 2):
self.r._archive_ready_wals()
mock_logger_info.assert_not_called()
with patch('os.listdir', Mock(return_value=['000000000000000000000000.ready'])):
# successful archive_command call
with patch.object(CancellableSubprocess, 'call', Mock(return_value=0)) as mock_subprocess_call:
get_guc_value_res = [
'on', 'command %f',
'always', 'command %f',
]
with patch.object(Postgresql, 'get_guc_value', Mock(side_effect=get_guc_value_res)):
for _ in range(len(get_guc_value_res) // 2):
self.r._archive_ready_wals()
mock_logger_info.assert_called_once()
self.assertEqual(('Trying to archive %s: %s',
'000000000000000000000000', 'command 000000000000000000000000'),
mock_logger_info.call_args[0])
mock_logger_info.reset_mock()
mock_subprocess_call.assert_called_once()
self.assertEqual(mock_subprocess_call.call_args[0][0], ['command 000000000000000000000000'])
self.assertEqual(mock_subprocess_call.call_args[1]['shell'], True)
mock_subprocess_call.reset_mock()
# failed archive_command call
with patch.object(CancellableSubprocess, 'call', Mock(return_value=1)):
with patch.object(Postgresql, 'get_guc_value', Mock(side_effect=['on', 'command %f'])):
self.r._archive_ready_wals()
self.assertEqual(('Trying to archive %s: %s',
'000000000000000000000000', 'command 000000000000000000000000'),
mock_logger_info.call_args_list[0][0])
self.assertEqual(('Failed to archive WAL segment %s', '000000000000000000000000'),
mock_logger_info.call_args_list[1][0])
mock_logger_info.reset_mock()
wal_files_to_skip = [
'000000000000000000000000.done',
'000000000000000000000001.partial.done',
'002.ready',
'U00000000000000000000001.ready',
]
with patch('os.listdir', Mock(return_value=wal_files_to_skip)):
self.r._archive_ready_wals()
mock_logger_info.assert_not_called()
@patch.object(Postgresql, 'major_version', PropertyMock(return_value=100000))
@patch('os.listdir', Mock(side_effect=[OSError, ['something', 'something_else']]))
@patch('shutil.rmtree', Mock())
@patch('patroni.postgresql.rewind.fsync_dir', Mock())
@patch('patroni.postgresql.rewind.logger.warning')
def test_maybe_clean_pg_replslot(self, mock_logger):
# failed to list pg_replslot/
self.assertIsNone(self.r._maybe_clean_pg_replslot())
mock_logger.assert_called_once()
mock_logger.reset_mock()
self.assertIsNone(self.r._maybe_clean_pg_replslot())
@patch('os.unlink', Mock())
@patch('os.listdir', Mock(return_value=[]))
@patch('os.path.isfile', Mock(return_value=True))
@patch.object(Rewind, 'read_postmaster_opts', Mock(return_value={}))
@patch.object(Rewind, 'single_user_mode', mock_single_user_mode)
def test_ensure_clean_shutdown(self):
self.assertIsNone(self.r.ensure_clean_shutdown())
@patch('patroni.postgresql.rewind.Thread', MockThread)
@patch.object(Postgresql, 'controldata')
@patch.object(Postgresql, 'checkpoint')
@patch.object(Postgresql, 'get_primary_timeline')
def test_ensure_checkpoint_after_promote(self, mock_get_primary_timeline, mock_checkpoint, mock_controldata):
mock_controldata.return_value = {"Latest checkpoint's TimeLineID": 1}
mock_get_primary_timeline.return_value = 1
self.r.ensure_checkpoint_after_promote(Mock())
self.r.reset_state()
mock_get_primary_timeline.return_value = 2
mock_checkpoint.return_value = 0
self.r.ensure_checkpoint_after_promote(Mock())
self.r.ensure_checkpoint_after_promote(Mock())
self.r.reset_state()
mock_controldata.side_effect = TypeError
mock_checkpoint.side_effect = Exception
self.r.ensure_checkpoint_after_promote(Mock())