mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
* Use YAML files to validate Postgres GUCs through Patroni. Patroni used to have a static list of Postgres GUCs validators in `patroni.postgresql.validator`. One problem with that approach, for example, is that it would not allow GUCs from custom Postgres builds to be validated/accepted. The idea that we had to work around that issue was to move the validators from the source code to an external and extendable source. With that Patroni will start reading the current validators from that external source plus whatever custom validators are found. From this commit onwards Patroni will read and parse all YAML files that are found under the `patroni/postgresql/available_parameters` directory to build its Postgres GUCs validation rules. All the details about how this work can be found in the docstring of the introduced function `_load_postgres_gucs_validators`.
103 lines
5.3 KiB
Python
103 lines
5.3 KiB
Python
import os
|
|
|
|
from mock import Mock, patch
|
|
|
|
from patroni.collections import CaseInsensitiveSet
|
|
from patroni.config import GlobalConfig
|
|
from patroni.dcs import Cluster, SyncState
|
|
from patroni.postgresql import Postgresql
|
|
|
|
from . import BaseTestPostgresql, psycopg_connect, mock_available_gucs
|
|
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
@patch.object(Postgresql, 'available_gucs', mock_available_gucs)
|
|
class TestSync(BaseTestPostgresql):
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('os.rename', Mock())
|
|
@patch('patroni.postgresql.CallbackExecutor', Mock())
|
|
@patch.object(Postgresql, 'get_major_version', Mock(return_value=140000))
|
|
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'available_gucs', mock_available_gucs)
|
|
def setUp(self):
|
|
super(TestSync, self).setUp()
|
|
self.p.config.write_postgresql_conf()
|
|
self.p._global_config = GlobalConfig({'synchronous_mode': True})
|
|
self.s = self.p.sync_handler
|
|
|
|
@patch.object(Postgresql, 'last_operation', Mock(return_value=1))
|
|
def test_pick_sync_standby(self):
|
|
cluster = Cluster(True, None, self.leader, 0, [self.me, self.other, self.leadermem], None,
|
|
SyncState(0, self.me.name, self.leadermem.name), None, None, None)
|
|
|
|
pg_stat_replication = [
|
|
{'pid': 100, 'application_name': self.leadermem.name, 'sync_state': 'sync', 'flush_lsn': 1},
|
|
{'pid': 101, 'application_name': self.me.name, 'sync_state': 'async', 'flush_lsn': 2},
|
|
{'pid': 102, 'application_name': self.other.name, 'sync_state': 'async', 'flush_lsn': 2}]
|
|
|
|
# sync node is a bit behind of async, but we prefer it anyway
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=[self.leadermem.name,
|
|
'on', pg_stat_replication]):
|
|
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.leadermem.name]),
|
|
CaseInsensitiveSet([self.leadermem.name])))
|
|
|
|
# prefer node with sync_state='potential', even if it is slightly behind of async
|
|
pg_stat_replication[0]['sync_state'] = 'potential'
|
|
for r in pg_stat_replication:
|
|
r['write_lsn'] = r.pop('flush_lsn')
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_write', pg_stat_replication]):
|
|
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.leadermem.name]),
|
|
CaseInsensitiveSet()))
|
|
|
|
# when there are no sync or potential candidates we pick async with the minimal replication lag
|
|
for i, r in enumerate(pg_stat_replication):
|
|
r.update(replay_lsn=3 - i, application_name=r['application_name'].upper())
|
|
missing = pg_stat_replication.pop(0)
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_apply', pg_stat_replication]):
|
|
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.me.name]), CaseInsensitiveSet()))
|
|
|
|
# unknown sync node is ignored
|
|
missing.update(application_name='missing', sync_state='sync')
|
|
pg_stat_replication.insert(0, missing)
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_apply', pg_stat_replication]):
|
|
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.me.name]), CaseInsensitiveSet()))
|
|
|
|
# invalid synchronous_standby_names and empty pg_stat_replication
|
|
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['a b', 'remote_apply', None]):
|
|
self.p._major_version = 90400
|
|
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet(), CaseInsensitiveSet()))
|
|
|
|
def test_set_sync_standby(self):
|
|
def value_in_conf():
|
|
with open(os.path.join(self.p.data_dir, 'postgresql.conf')) as f:
|
|
for line in f:
|
|
if line.startswith('synchronous_standby_names'):
|
|
return line.strip()
|
|
|
|
mock_reload = self.p.reload = Mock()
|
|
self.s.set_synchronous_standby_names(CaseInsensitiveSet(['n1']))
|
|
self.assertEqual(value_in_conf(), "synchronous_standby_names = 'n1'")
|
|
mock_reload.assert_called()
|
|
|
|
mock_reload.reset_mock()
|
|
self.s.set_synchronous_standby_names(CaseInsensitiveSet(['n1']))
|
|
mock_reload.assert_not_called()
|
|
self.assertEqual(value_in_conf(), "synchronous_standby_names = 'n1'")
|
|
|
|
self.s.set_synchronous_standby_names(CaseInsensitiveSet(['n1', 'n2']))
|
|
mock_reload.assert_called()
|
|
self.assertEqual(value_in_conf(), "synchronous_standby_names = '2 (n1,n2)'")
|
|
|
|
mock_reload.reset_mock()
|
|
self.s.set_synchronous_standby_names(CaseInsensitiveSet([]))
|
|
mock_reload.assert_called()
|
|
self.assertEqual(value_in_conf(), None)
|
|
|
|
mock_reload.reset_mock()
|
|
self.p._global_config = GlobalConfig({'synchronous_mode': True})
|
|
self.s.set_synchronous_standby_names(CaseInsensitiveSet('*'))
|
|
mock_reload.assert_called()
|
|
self.assertEqual(value_in_conf(), "synchronous_standby_names = '*'")
|