mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
The logical slot on a replica is safe to use when the physical replica slot on the primary: 1. has a nonzero/non-null `catalog_xmin` 2. has a `catalog_xmin` that is not newer (greater) than the `catalog_xmin` of any slot on the standby 3. the `catalog_xmin` is known to overtake `catalog_xmin` of logical slots on the primary observed during `1` In case if `1` doesn't take place, Patroni will run an additional check whether the `hot_standby_feedback` is actually in effect and shows the warning in case it is not.
133 lines
7.2 KiB
Python
133 lines
7.2 KiB
Python
import mock
|
|
import os
|
|
import unittest
|
|
|
|
|
|
from mock import Mock, PropertyMock, patch
|
|
|
|
from patroni import psycopg
|
|
from patroni.dcs import Cluster, ClusterConfig, Member
|
|
from patroni.postgresql import Postgresql
|
|
from patroni.postgresql.slots import SlotsHandler, fsync_dir
|
|
|
|
from . import BaseTestPostgresql, psycopg_connect, MockCursor
|
|
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('patroni.psycopg.connect', psycopg_connect)
|
|
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
|
|
class TestSlotsHandler(BaseTestPostgresql):
|
|
|
|
@patch('subprocess.call', Mock(return_value=0))
|
|
@patch('os.rename', Mock())
|
|
@patch('patroni.postgresql.CallbackExecutor', Mock())
|
|
@patch.object(Postgresql, 'get_major_version', Mock(return_value=130000))
|
|
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
|
|
def setUp(self):
|
|
super(TestSlotsHandler, self).setUp()
|
|
self.s = self.p.slots_handler
|
|
self.p.start()
|
|
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}}}, 1)
|
|
self.cluster = Cluster(True, config, self.leader, 0,
|
|
[self.me, self.other, self.leadermem], None, None, None, {'ls': 12345})
|
|
|
|
def test_sync_replication_slots(self):
|
|
config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'},
|
|
'A': 0, 'ls': 0, 'b': {'type': 'logical', 'plugin': '1'}},
|
|
'ignore_slots': [{'name': 'blabla'}]}, 1)
|
|
cluster = Cluster(True, config, self.leader, 0,
|
|
[self.me, self.other, self.leadermem], None, None, None, {'test_3': 10})
|
|
with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg.OperationalError)):
|
|
self.s.sync_replication_slots(cluster, False)
|
|
self.p.set_role('standby_leader')
|
|
self.s.sync_replication_slots(cluster, False)
|
|
self.p.set_role('replica')
|
|
with patch.object(Postgresql, 'is_leader', Mock(return_value=False)):
|
|
self.s.sync_replication_slots(cluster, False)
|
|
self.p.set_role('master')
|
|
with mock.patch('patroni.postgresql.Postgresql.role', new_callable=PropertyMock(return_value='replica')):
|
|
self.s.sync_replication_slots(cluster, False)
|
|
with patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=True)),\
|
|
patch('patroni.dcs.logger.error', new_callable=Mock()) as errorlog_mock:
|
|
alias1 = Member(0, 'test-3', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'})
|
|
alias2 = Member(0, 'test.3', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'})
|
|
cluster.members.extend([alias1, alias2])
|
|
self.s.sync_replication_slots(cluster, False)
|
|
self.assertEqual(errorlog_mock.call_count, 5)
|
|
ca = errorlog_mock.call_args_list[0][0][1]
|
|
self.assertTrue("test-3" in ca, "non matching {0}".format(ca))
|
|
self.assertTrue("test.3" in ca, "non matching {0}".format(ca))
|
|
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=90618)):
|
|
self.s.sync_replication_slots(cluster, False)
|
|
|
|
def test_process_permanent_slots(self):
|
|
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}},
|
|
'ignore_slots': [{'name': 'blabla'}]}, 1)
|
|
cluster = Cluster(True, config, self.leader, 0, [self.me, self.other, self.leadermem], None, None, None, None)
|
|
|
|
self.s.sync_replication_slots(cluster, False)
|
|
with patch.object(Postgresql, '_query') as mock_query:
|
|
self.p.reset_cluster_info_state(None)
|
|
mock_query.return_value.fetchone.return_value = (
|
|
1, 0, 0, 0, 0, 0, 0, 0, 0,
|
|
[{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b",
|
|
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])
|
|
self.assertEqual(self.p.slots(), {'ls': 12345})
|
|
|
|
self.p.reset_cluster_info_state(None)
|
|
mock_query.return_value.fetchone.return_value = (
|
|
1, 0, 0, 0, 0, 0, 0, 0, 0,
|
|
[{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b",
|
|
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])
|
|
self.assertEqual(self.p.slots(), {})
|
|
|
|
@patch.object(Postgresql, 'is_leader', Mock(return_value=False))
|
|
def test__ensure_logical_slots_replica(self):
|
|
self.p.set_role('replica')
|
|
self.cluster.slots['ls'] = 12346
|
|
with patch.object(SlotsHandler, 'check_logical_slots_readiness', Mock()):
|
|
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), [])
|
|
self.s._schedule_load_slots = False
|
|
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)),\
|
|
patch.object(psycopg.OperationalError, 'diag') as mock_diag:
|
|
type(mock_diag).sqlstate = PropertyMock(return_value='58P01')
|
|
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), ['ls'])
|
|
self.cluster.slots['ls'] = 'a'
|
|
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), [])
|
|
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
|
|
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), ['ls'])
|
|
|
|
def test_copy_logical_slots(self):
|
|
self.cluster.config.data['slots']['ls']['database'] = 'b'
|
|
self.s.copy_logical_slots(self.cluster, ['ls'])
|
|
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)):
|
|
self.s.copy_logical_slots(self.cluster, ['foo'])
|
|
|
|
@patch.object(Postgresql, 'stop', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'start', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'is_leader', Mock(return_value=False))
|
|
def test_check_logical_slots_readiness(self):
|
|
self.s.copy_logical_slots(self.cluster, ['ls'])
|
|
with patch.object(MockCursor, '__iter__', Mock(return_value=iter([('postgresql0', None)]))),\
|
|
patch.object(MockCursor, 'fetchone', Mock(side_effect=Exception)):
|
|
self.assertIsNone(self.s.check_logical_slots_readiness(self.cluster, False, None))
|
|
with patch.object(MockCursor, '__iter__', Mock(return_value=iter([('postgresql0', None)]))),\
|
|
patch.object(MockCursor, 'fetchone', Mock(return_value=(False,))):
|
|
self.assertIsNone(self.s.check_logical_slots_readiness(self.cluster, False, None))
|
|
with patch.object(MockCursor, '__iter__', Mock(return_value=iter([('ls', 100)]))):
|
|
self.s.check_logical_slots_readiness(self.cluster, False, None)
|
|
|
|
@patch.object(Postgresql, 'stop', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'start', Mock(return_value=True))
|
|
@patch.object(Postgresql, 'is_leader', Mock(return_value=False))
|
|
def test_on_promote(self):
|
|
self.s.copy_logical_slots(self.cluster, ['ls'])
|
|
self.s.on_promote()
|
|
|
|
@unittest.skipIf(os.name == 'nt', "Windows not supported")
|
|
@patch('os.open', Mock())
|
|
@patch('os.close', Mock())
|
|
@patch('os.fsync', Mock(side_effect=OSError))
|
|
def test_fsync_dir(self):
|
|
self.assertRaises(OSError, fsync_dir, 'foo')
|