mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
Add support for ``nostream`` tag. If set to ``true`` the node will not use replication protocol to stream WAL. It will rely instead on archive recovery (if ``restore_command`` is configured) and ``pg_wal``/``pg_xlog`` polling. It also disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas. Setting this tag on primary node has no effect.
275 lines
15 KiB
Python
275 lines
15 KiB
Python
import mock
|
|
import os
|
|
import unittest
|
|
|
|
|
|
from mock import Mock, PropertyMock, patch
|
|
from threading import Thread
|
|
|
|
from patroni import global_config, psycopg
|
|
from patroni.dcs import Cluster, ClusterConfig, Member, Status, SyncState
|
|
from patroni.postgresql import Postgresql
|
|
from patroni.postgresql.misc import fsync_dir
|
|
from patroni.postgresql.slots import SlotsAdvanceThread, SlotsHandler
|
|
from patroni.tags import Tags
|
|
|
|
from . import BaseTestPostgresql, psycopg_connect, MockCursor
|
|
|
|
|
|
class TestTags(Tags):
|
|
|
|
@property
|
|
def tags(self):
|
|
return {}
|
|
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
@patch.object(Thread, 'start', Mock())
|
|
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
|
|
class TestSlotsHandler(BaseTestPostgresql):
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('os.rename', Mock())
|
|
@patch('patroni.postgresql.CallbackExecutor', Mock())
|
|
@patch.object(Postgresql, 'get_major_version', Mock(return_value=130000))
|
|
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
|
|
def setUp(self):
|
|
super(TestSlotsHandler, self).setUp()
|
|
self.s = self.p.slots_handler
|
|
self.p.start()
|
|
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'ls2': None}}, 1)
|
|
self.cluster = Cluster(True, config, self.leader, Status(0, {'ls': 12345, 'ls2': 12345}),
|
|
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
|
global_config.update(self.cluster)
|
|
self.tags = TestTags()
|
|
|
|
def test_sync_replication_slots(self):
|
|
config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'},
|
|
'A': 0, 'ls': 0, 'b': {'type': 'logical', 'plugin': '1'}},
|
|
'ignore_slots': [{'name': 'blabla'}]}, 1)
|
|
cluster = Cluster(True, config, self.leader, Status(0, {'test_3': 10}),
|
|
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
|
global_config.update(cluster)
|
|
with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg.OperationalError)):
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
self.p.set_role('standby_leader')
|
|
with patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(True, False))), \
|
|
patch.object(global_config.__class__, 'is_standby_cluster', PropertyMock(return_value=True)), \
|
|
patch('patroni.postgresql.slots.logger.debug') as mock_debug:
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
mock_debug.assert_called_once()
|
|
self.p.set_role('replica')
|
|
with patch.object(Postgresql, 'is_primary', Mock(return_value=False)), \
|
|
patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)), \
|
|
patch.object(SlotsHandler, 'drop_replication_slot') as mock_drop:
|
|
config.data['slots'].pop('ls')
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
mock_drop.assert_not_called()
|
|
self.p.set_role('primary')
|
|
with mock.patch('patroni.postgresql.Postgresql.role', new_callable=PropertyMock(return_value='replica')):
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
with patch('patroni.dcs.logger.error', new_callable=Mock()) as errorlog_mock:
|
|
alias1 = Member(0, 'test-3', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'})
|
|
alias2 = Member(0, 'test.3', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'})
|
|
cluster.members.extend([alias1, alias2])
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
self.assertEqual(errorlog_mock.call_count, 5)
|
|
ca = errorlog_mock.call_args_list[0][0][1]
|
|
self.assertTrue("test-3" in ca, "non matching {0}".format(ca))
|
|
self.assertTrue("test.3" in ca, "non matching {0}".format(ca))
|
|
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=90618)):
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
self.p.set_role('replica')
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
|
|
def test_cascading_replica_sync_replication_slots(self):
|
|
"""Test sync with a cascading replica so physical slots are present on a replica."""
|
|
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}}}, 1)
|
|
cascading_replica = Member(0, 'test-2', 28, {
|
|
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
|
'tags': {'replicatefrom': 'postgresql0'}
|
|
})
|
|
cluster = Cluster(True, config, self.leader, Status(0, {'ls': 10}),
|
|
[self.me, self.other, self.leadermem, cascading_replica], None, SyncState.empty(), None, None)
|
|
self.p.set_role('replica')
|
|
with patch.object(Postgresql, '_query') as mock_query, \
|
|
patch.object(Postgresql, 'is_primary', Mock(return_value=False)):
|
|
mock_query.return_value = [('ls', 'logical', 104, 'b', 'a', 5, 12345, 105)]
|
|
ret = self.s.sync_replication_slots(cluster, self.tags)
|
|
self.assertEqual(ret, [])
|
|
|
|
def test_process_permanent_slots(self):
|
|
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'blabla': {'type': 'physical'}},
|
|
'ignore_slots': [{'name': 'blabla'}]}, 1)
|
|
cluster = Cluster(True, config, self.leader, Status.empty(), [self.me, self.other, self.leadermem],
|
|
None, SyncState.empty(), None, None)
|
|
global_config.update(cluster)
|
|
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
with patch.object(Postgresql, '_query') as mock_query:
|
|
self.p.reset_cluster_info_state(None)
|
|
mock_query.return_value = [(
|
|
1, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
|
|
[{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b",
|
|
"confirmed_flush_lsn": 12345, "catalog_xmin": 105, "restart_lsn": 12344},
|
|
{"slot_name": "blabla", "type": "physical", "datoid": None, "plugin": None,
|
|
"confirmed_flush_lsn": None, "catalog_xmin": 105, "restart_lsn": 12344}])]
|
|
self.assertEqual(self.p.slots(), {'ls': 12345, 'blabla': 12344})
|
|
|
|
self.p.reset_cluster_info_state(None)
|
|
mock_query.return_value = [(
|
|
1, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
|
|
[{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b",
|
|
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])]
|
|
self.assertEqual(self.p.slots(), {})
|
|
|
|
def test_nostream_slot_processing(self):
|
|
config = ClusterConfig(
|
|
1, {'slots': {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}}, 1)
|
|
nostream_node = Member(0, 'test-2', 28, {
|
|
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
|
'tags': {'nostream': 'True'}
|
|
})
|
|
cascade_node = Member(0, 'test-3', 28, {
|
|
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
|
'tags': {'replicatefrom': 'test-2'}
|
|
})
|
|
stream_node = Member(0, 'test-4', 28, {
|
|
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'})
|
|
cluster = Cluster(
|
|
True, config, self.leader, Status.empty(),
|
|
[self.leadermem, nostream_node, cascade_node, stream_node], None, SyncState.empty(), None, None)
|
|
global_config.update(cluster)
|
|
|
|
# sanity for primary
|
|
self.p.name = self.leadermem.name
|
|
self.assertEqual(
|
|
cluster._get_permanent_slots(self.p, self.leadermem, 'primary'),
|
|
{'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}})
|
|
self.assertEqual(
|
|
cluster._get_members_slots(self.p.name, 'primary'),
|
|
{'test_4': {'type': 'physical'}})
|
|
|
|
# nostream node must not have slot on primary
|
|
self.p.name = nostream_node.name
|
|
# permanent logical slots are not allowed on nostream node
|
|
self.assertEqual(
|
|
cluster._get_permanent_slots(self.p, nostream_node, 'replica'),
|
|
{'bar': {'type': 'physical'}})
|
|
self.assertEqual(
|
|
cluster.get_slot_name_on_primary(self.p.name, nostream_node),
|
|
None)
|
|
|
|
# check cascade member-slot existence on nostream node
|
|
self.assertEqual(
|
|
cluster._get_members_slots(nostream_node.name, 'replica'),
|
|
{'test_3': {'type': 'physical'}})
|
|
|
|
# cascade also does not entitled to have logical slot on itself ...
|
|
self.p.name = cascade_node.name
|
|
self.assertEqual(
|
|
cluster._get_permanent_slots(self.p, cascade_node, 'replica'),
|
|
{'bar': {'type': 'physical'}})
|
|
# ... and member-slot on primary
|
|
self.assertEqual(
|
|
cluster.get_slot_name_on_primary(self.p.name, cascade_node),
|
|
None)
|
|
|
|
# simple replica must have every permanent slot ...
|
|
self.p.name = stream_node.name
|
|
self.assertEqual(
|
|
cluster._get_permanent_slots(self.p, stream_node, 'replica'),
|
|
{'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}})
|
|
# ... and member-slot on primary
|
|
self.assertEqual(
|
|
cluster.get_slot_name_on_primary(self.p.name, stream_node),
|
|
'test_4')
|
|
|
|
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
|
|
def test__ensure_logical_slots_replica(self):
|
|
self.p.set_role('replica')
|
|
self.cluster.slots['ls'] = 12346
|
|
with patch.object(SlotsHandler, 'check_logical_slots_readiness', Mock(return_value=False)):
|
|
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
|
|
with patch.object(SlotsHandler, '_query', Mock(return_value=[('ls', 'logical', 499, 'b', 'a', 5, 100, 500)])), \
|
|
patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)), \
|
|
patch.object(SlotsAdvanceThread, 'schedule', Mock(return_value=(True, ['ls']))), \
|
|
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
|
|
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
|
|
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])
|
|
self.cluster.slots['ls'] = 'a'
|
|
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), [])
|
|
self.cluster.config.data['slots']['ls']['database'] = 'b'
|
|
self.cluster.slots['ls'] = '500'
|
|
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
|
|
self.assertEqual(self.s.sync_replication_slots(self.cluster, self.tags), ['ls'])
|
|
|
|
def test_copy_logical_slots(self):
|
|
self.cluster.config.data['slots']['ls']['database'] = 'b'
|
|
self.s.copy_logical_slots(self.cluster, self.tags, ['ls'])
|
|
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)):
|
|
self.s.copy_logical_slots(self.cluster, self.tags, ['foo'])
|
|
with patch.object(Cluster, 'leader', PropertyMock(return_value=None)):
|
|
self.s.copy_logical_slots(self.cluster, self.tags, ['foo'])
|
|
|
|
@patch.object(Postgresql, 'stop', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'start', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
|
|
def test_check_logical_slots_readiness(self):
|
|
self.s.copy_logical_slots(self.cluster, self.tags, ['ls'])
|
|
with patch.object(MockCursor, '__iter__', Mock(return_value=iter([('postgresql0', None)]))), \
|
|
patch.object(MockCursor, 'fetchall', Mock(side_effect=Exception)):
|
|
self.assertFalse(self.s.check_logical_slots_readiness(self.cluster, self.tags))
|
|
with patch.object(MockCursor, '__iter__', Mock(return_value=iter([('postgresql0', None)]))), \
|
|
patch.object(MockCursor, 'fetchall', Mock(return_value=[(False,)])):
|
|
self.assertFalse(self.s.check_logical_slots_readiness(self.cluster, self.tags))
|
|
with patch.object(MockCursor, '__iter__', Mock(return_value=iter([('ls', 100)]))):
|
|
self.s.check_logical_slots_readiness(self.cluster, self.tags)
|
|
|
|
@patch.object(Postgresql, 'stop', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'start', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
|
|
def test_on_promote(self):
|
|
self.s.schedule_advance_slots({'foo': {'bar': 100}})
|
|
self.s.copy_logical_slots(self.cluster, self.tags, ['ls'])
|
|
self.s.on_promote()
|
|
|
|
@unittest.skipIf(os.name == 'nt', "Windows not supported")
|
|
@patch('os.open', Mock())
|
|
@patch('os.close', Mock())
|
|
@patch('os.fsync', Mock(side_effect=OSError))
|
|
def test_fsync_dir(self):
|
|
self.assertRaises(OSError, fsync_dir, 'foo')
|
|
|
|
def test_slots_advance_thread(self):
|
|
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)), \
|
|
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
|
|
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
|
|
self.s.schedule_advance_slots({'foo': {'bar': 100}})
|
|
self.s._advance.sync_slots()
|
|
|
|
with patch.object(SlotsAdvanceThread, 'sync_slots', Mock(side_effect=Exception)):
|
|
self.s._advance._condition.wait = Mock()
|
|
self.assertRaises(Exception, self.s._advance.run)
|
|
|
|
with patch.object(SlotsHandler, 'get_local_connection_cursor', Mock(side_effect=Exception)):
|
|
self.s.schedule_advance_slots({'foo': {'bar': 100}})
|
|
self.s._advance.sync_slots()
|
|
|
|
@patch.object(Postgresql, 'is_primary', Mock(return_value=False))
|
|
def test_advance_physical_slots(self):
|
|
config = ClusterConfig(1, {'slots': {'blabla': {'type': 'physical'}, 'leader': None}}, 1)
|
|
cluster = Cluster(True, config, self.leader, Status(0, {'blabla': 12346}),
|
|
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
|
global_config.update(cluster)
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('blabla', 'physical', 12345, None, None, None,
|
|
None, None)], Exception])) as mock_query, \
|
|
patch('patroni.postgresql.slots.logger.error') as mock_error:
|
|
self.s.sync_replication_slots(cluster, self.tags)
|
|
self.assertEqual(mock_query.call_args[0],
|
|
("SELECT pg_catalog.pg_replication_slot_advance(%s, %s)", "blabla", '0/303A'))
|
|
self.assertEqual(mock_error.call_args[0][0],
|
|
"Error while advancing replication slot %s to position '%s': %r")
|