mirror of
https://github.com/optim-enterprises-bv/patroni.git
synced 2026-01-10 01:21:54 +00:00
Current problem of Patroni that strikes many people is that it removes replication slot for member which key is expired from DCS. As a result, when the replica comes back from a scheduled maintenance WAL segments could be already absent, and it can't continue streaming without pulling files from archive. With PostgreSQL 16 and newer we get another problem: logical slot on a standby node could be invalidated if physical replication slot on the primary was removed (and `pg_catalog` vacuumed). The most problematic environment is Kubernetes, where slot is removed nearly instantly when member Pod is deleted. So far, one of the recommended solutions was to configure permanent physical slots with names that match member names to avoid removal of replication slots. It works, but depending on environment might be non-trivial to implement (when for example members may change their names). This PR implements support of `member_slots_ttl` global configuration parameter, that controls for how long member replication slots should be kept when the member key is absent. Default value is set to `30min`. The feature is supported only starting from PostgreSQL 11 and newer, because we want to retain slots not only on the leader node, but on all nodes that could potentially become the new leader, and they should be moved forward using `pg_replication_slot_advance()` function. One could disable feature and get back to the old behavior by setting `member_slots_ttl` to `0`.
185 lines
7.7 KiB
Python
185 lines
7.7 KiB
Python
import os
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
|
|
from unittest.mock import Mock, patch, PropertyMock
|
|
|
|
from pysyncobj import FAIL_REASON, SyncObjConf
|
|
|
|
from patroni.dcs import get_dcs
|
|
from patroni.dcs.raft import _TCPTransport, Cluster, DynMemberSyncObj, \
|
|
KVStoreTTL, Raft, RaftError, SyncObjUtility, TCPTransport
|
|
from patroni.postgresql.mpp import get_mpp
|
|
|
|
|
|
def remove_files(prefix):
|
|
for f in ('journal', 'journal.meta', 'dump'):
|
|
f = prefix + f
|
|
if os.path.isfile(f):
|
|
for i in range(0, 15):
|
|
try:
|
|
if os.path.isfile(f):
|
|
os.unlink(f)
|
|
break
|
|
else:
|
|
break
|
|
except Exception:
|
|
time.sleep(1.0)
|
|
|
|
|
|
class TestTCPTransport(unittest.TestCase):
|
|
|
|
@patch.object(TCPTransport, '__init__', Mock())
|
|
@patch.object(TCPTransport, 'setOnUtilityMessageCallback', Mock())
|
|
@patch.object(TCPTransport, '_connectIfNecessarySingle', Mock(side_effect=Exception))
|
|
def test__connectIfNecessarySingle(self):
|
|
t = _TCPTransport(Mock(), None, [])
|
|
self.assertFalse(t._connectIfNecessarySingle(None))
|
|
|
|
|
|
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
|
|
class TestDynMemberSyncObj(unittest.TestCase):
|
|
|
|
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
|
|
def setUp(self):
|
|
self.conf = SyncObjConf(appendEntriesUseBatch=False, dynamicMembershipChange=True, autoTick=False)
|
|
self.so = DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
|
|
|
|
@patch.object(SyncObjUtility, 'executeCommand')
|
|
def test_add_member(self, mock_execute_command):
|
|
mock_execute_command.return_value = [{'addr': '127.0.0.1:1235'}, {'addr': '127.0.0.1:1236'}]
|
|
mock_execute_command.ver = 0
|
|
DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
|
|
self.conf.dynamicMembershipChange = False
|
|
DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
|
|
|
|
def test_getMembers(self):
|
|
mock_conn = Mock()
|
|
self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, ['members'])
|
|
|
|
def test__SyncObj__doChangeCluster(self):
|
|
self.so._SyncObj__doChangeCluster(['add', '127.0.0.1:1236'])
|
|
|
|
|
|
@patch.object(SyncObjConf, 'fullDumpFile', PropertyMock(return_value=None), create=True)
|
|
@patch.object(SyncObjConf, 'journalFile', PropertyMock(return_value=None), create=True)
|
|
class TestKVStoreTTL(unittest.TestCase):
|
|
|
|
@patch.object(SyncObjConf, 'fullDumpFile', PropertyMock(return_value=None), create=True)
|
|
@patch.object(SyncObjConf, 'journalFile', PropertyMock(return_value=None), create=True)
|
|
def setUp(self):
|
|
callback = Mock()
|
|
callback.replicated = False
|
|
self.so = KVStoreTTL(None, callback, callback, self_addr='127.0.0.1:1234')
|
|
self.so.startAutoTick()
|
|
self.so.set_retry_timeout(10)
|
|
|
|
def tearDown(self):
|
|
if self.so:
|
|
self.so.destroy()
|
|
|
|
def test_set(self):
|
|
self.assertTrue(self.so.set('foo', 'bar', prevExist=False, ttl=30))
|
|
self.assertFalse(self.so.set('foo', 'bar', prevExist=False, ttl=30))
|
|
self.assertFalse(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}, prevValue=''))
|
|
self.assertTrue(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}))
|
|
with patch.object(KVStoreTTL, 'retry', Mock(side_effect=RaftError(''))):
|
|
self.assertFalse(self.so.set('foo', 'bar'))
|
|
self.assertRaises(RaftError, self.so.set, 'foo', 'bar', handle_raft_error=False)
|
|
|
|
def test_delete(self):
|
|
self.so.autoTickPeriod = 0.2
|
|
self.so.set('foo', 'bar')
|
|
self.so.set('fooo', 'bar')
|
|
self.assertFalse(self.so.delete('foo', prevValue='buz'))
|
|
self.assertTrue(self.so.delete('foo', recursive=True))
|
|
self.assertFalse(self.so.retry(self.so._delete, 'foo', prevValue=''))
|
|
with patch.object(KVStoreTTL, 'retry', Mock(side_effect=RaftError(''))):
|
|
self.assertFalse(self.so.delete('foo'))
|
|
|
|
def test_expire(self):
|
|
self.so.set('foo', 'bar', ttl=0.001)
|
|
time.sleep(1)
|
|
self.assertIsNone(self.so.get('foo'))
|
|
self.assertEqual(self.so.get('foo', recursive=True), {})
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test_retry(self):
|
|
return_values = [FAIL_REASON.QUEUE_FULL] * 2 + [FAIL_REASON.SUCCESS, FAIL_REASON.REQUEST_DENIED]
|
|
|
|
def test(callback):
|
|
callback(True, return_values.pop(0))
|
|
|
|
with patch('time.time', Mock(side_effect=[1, 100])):
|
|
self.assertRaises(RaftError, self.so.retry, test)
|
|
|
|
self.assertTrue(self.so.retry(test))
|
|
self.assertFalse(self.so.retry(test))
|
|
|
|
def test_on_ready_override(self):
|
|
self.assertTrue(self.so.set('foo', 'bar'))
|
|
self.so.destroy()
|
|
self.so = None
|
|
so = KVStoreTTL(Mock(), None, None, self_addr='127.0.0.1:1234',
|
|
partner_addrs=['127.0.0.1:1235'], patronictl=True)
|
|
so.doTick(0)
|
|
so.destroy()
|
|
|
|
|
|
class TestRaft(unittest.TestCase):
|
|
|
|
_TMP = tempfile.gettempdir()
|
|
|
|
def test_raft(self):
|
|
raft = get_dcs({'ttl': 30, 'scope': 'test', 'name': 'pg', 'retry_timeout': 10,
|
|
'raft': {'self_addr': '127.0.0.1:1234', 'data_dir': self._TMP},
|
|
'citus': {'group': 0, 'database': 'postgres'}})
|
|
self.assertIsInstance(raft, Raft)
|
|
raft.reload_config({'retry_timeout': 20, 'ttl': 60, 'loop_wait': 10})
|
|
self.assertTrue(raft._sync_obj.set(raft.members_path + 'legacy', '{"version":"2.0.0"}'))
|
|
self.assertTrue(raft.touch_member(''))
|
|
self.assertTrue(raft.initialize())
|
|
self.assertTrue(raft.cancel_initialization())
|
|
self.assertTrue(raft.set_config_value('{}'))
|
|
self.assertTrue(raft.write_sync_state('foo', 'bar', 0))
|
|
self.assertFalse(raft.write_sync_state('foo', 'bar', 0, 1))
|
|
raft._mpp = get_mpp({'citus': {'group': 1, 'database': 'postgres'}})
|
|
self.assertTrue(raft.manual_failover('foo', 'bar'))
|
|
raft._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})
|
|
self.assertTrue(raft.take_leader())
|
|
cluster = raft.get_cluster()
|
|
self.assertIsInstance(cluster, Cluster)
|
|
self.assertIsInstance(cluster.workers[1], Cluster)
|
|
self.assertTrue(raft.delete_leader(cluster.leader))
|
|
self.assertTrue(raft._sync_obj.set(raft.status_path,
|
|
'{"optime":1234567,"slots":{"ls":12345},"retain_slots":["postgresql0"]}'))
|
|
raft.get_cluster()
|
|
self.assertTrue(raft.update_leader(cluster, '1', failsafe={'foo': 'bat'}))
|
|
self.assertTrue(raft._sync_obj.set(raft.failsafe_path, '{"foo"}'))
|
|
self.assertTrue(raft._sync_obj.set(raft.status_path, '{'))
|
|
raft.get_mpp_coordinator()
|
|
self.assertTrue(raft.delete_sync_state())
|
|
self.assertTrue(raft.set_history_value(''))
|
|
self.assertTrue(raft.delete_cluster())
|
|
raft._mpp = get_mpp({'citus': {'group': 1, 'database': 'postgres'}})
|
|
self.assertTrue(raft.delete_cluster())
|
|
raft._mpp = get_mpp({})
|
|
raft.get_cluster()
|
|
raft.watch(None, 0.001)
|
|
raft._sync_obj.destroy()
|
|
|
|
def tearDown(self):
|
|
remove_files(os.path.join(self._TMP, '127.0.0.1:1234.'))
|
|
|
|
def setUp(self):
|
|
self.tearDown()
|
|
|
|
@patch('patroni.dcs.raft.KVStoreTTL')
|
|
@patch('threading.Event')
|
|
def test_init(self, mock_event, mock_kvstore):
|
|
mock_kvstore.return_value.applied_local_log = False
|
|
mock_event.return_value.is_set.side_effect = [False, True]
|
|
self.assertIsInstance(get_dcs({'ttl': 30, 'scope': 'test', 'name': 'pg', 'patronictl': True,
|
|
'raft': {'self_addr': '1', 'data_dir': self._TMP}}), Raft)
|