mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-28 10:20:05 +00:00
When `synchronous_standby_names` GUC is changed PostgreSQL nearly immediately starts reporting corresponding walsenders as synchronous, while in fact they maybe didn't reach this state yet. To mitigate this problem we memorize current flush lsn on the primary right after change of `synchronous_standby_names` got visible and use it as an additional check for walsenders. The walsender will be counted as truly "sync" only when write/flush/replay_lsn on it reached memorized LSN and the `application_name` is known to be a part of `synchronous_standby_names`. The size of PR mostly related to refactoring and moving the code responsible for working with `synchronous_standby_names` and `pg_stat_replication` to the dedicated file. And `parse_sync_standby_names()` function was mostly copied from #672.
90 lines
4.3 KiB
Python
90 lines
4.3 KiB
Python
import os
|
|
|
|
from mock import Mock, patch
|
|
|
|
from patroni.dcs import Cluster, SyncState
|
|
from patroni.postgresql import Postgresql
|
|
|
|
from . import BaseTestPostgresql, psycopg_connect
|
|
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
class TestSync(BaseTestPostgresql):
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('os.rename', Mock())
|
|
@patch('patroni.postgresql.CallbackExecutor', Mock())
|
|
@patch.object(Postgresql, 'get_major_version', Mock(return_value=140000))
|
|
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
|
|
def setUp(self):
|
|
super(TestSync, self).setUp()
|
|
self.p.config.write_postgresql_conf()
|
|
self.s = self.p.sync_handler
|
|
|
|
@patch.object(Postgresql, 'last_operation', Mock(return_value=1))
|
|
def test_pick_sync_standby(self):
|
|
cluster = Cluster(True, None, self.leader, 0, [self.me, self.other, self.leadermem], None,
|
|
SyncState(0, self.me.name, self.leadermem.name), None, None, None)
|
|
|
|
pg_stat_replication = [
|
|
{'pid': 100, 'application_name': self.leadermem.name, 'sync_state': 'sync', 'flush_lsn': 1},
|
|
{'pid': 101, 'application_name': self.me.name, 'sync_state': 'async', 'flush_lsn': 2},
|
|
{'pid': 102, 'application_name': self.other.name, 'sync_state': 'async', 'flush_lsn': 2}]
|
|
|
|
# sync node is a bit behind of async, but we prefer it anyway
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=[self.leadermem.name,
|
|
'on', pg_stat_replication]):
|
|
self.assertEqual(self.s.current_state(cluster), ([self.leadermem.name], [self.leadermem.name]))
|
|
|
|
# prefer node with sync_state='potential', even if it is slightly behind of async
|
|
pg_stat_replication[0]['sync_state'] = 'potential'
|
|
for r in pg_stat_replication:
|
|
r['write_lsn'] = r.pop('flush_lsn')
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_write', pg_stat_replication]):
|
|
self.assertEqual(self.s.current_state(cluster), ([self.leadermem.name], []))
|
|
|
|
# when there are no sync or potential candidates we pick async with the minimal replication lag
|
|
for i, r in enumerate(pg_stat_replication):
|
|
r.update(replay_lsn=3 - i, application_name=r['application_name'].upper())
|
|
missing = pg_stat_replication.pop(0)
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_apply', pg_stat_replication]):
|
|
self.assertEqual(self.s.current_state(cluster), ([self.me.name], []))
|
|
|
|
# unknown sync node is ignored
|
|
missing.update(application_name='missing', sync_state='sync')
|
|
pg_stat_replication.insert(0, missing)
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_apply', pg_stat_replication]):
|
|
self.assertEqual(self.s.current_state(cluster), ([self.me.name], []))
|
|
|
|
# invalid synchronous_standby_names and empty pg_stat_replication
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['a b', 'remote_apply', None]):
|
|
self.p._major_version = 90400
|
|
self.assertEqual(self.s.current_state(cluster), ([], []))
|
|
|
|
def test_set_sync_standby(self):
|
|
def value_in_conf():
|
|
with open(os.path.join(self.p.data_dir, 'postgresql.conf')) as f:
|
|
for line in f:
|
|
if line.startswith('synchronous_standby_names'):
|
|
return line.strip()
|
|
|
|
mock_reload = self.p.reload = Mock()
|
|
self.s.set_synchronous_standby_names(['n1'])
|
|
self.assertEqual(value_in_conf(), "synchronous_standby_names = 'n1'")
|
|
mock_reload.assert_called()
|
|
|
|
mock_reload.reset_mock()
|
|
self.s.set_synchronous_standby_names(['n1'])
|
|
mock_reload.assert_not_called()
|
|
self.assertEqual(value_in_conf(), "synchronous_standby_names = 'n1'")
|
|
|
|
self.s.set_synchronous_standby_names(['n1', 'n2'])
|
|
mock_reload.assert_called()
|
|
self.assertEqual(value_in_conf(), "synchronous_standby_names = '2 (n1,n2)'")
|
|
|
|
mock_reload.reset_mock()
|
|
self.s.set_synchronous_standby_names([])
|
|
mock_reload.assert_called()
|
|
self.assertEqual(value_in_conf(), None)
|