Files
patroni/tests/test_raft.py
Alexander Kukushkin cb3071adfb Annual cleanup (#2159)
-  Simplify setup.py: remove unneeded features and get rid of deprecation warnings
-  Compatibility with Python 3.10: handle `threading.Event.isSet()` deprecation
-  Make sure setup.py could run without `six`: move Patroni class and main function to the `__main__.py`. The `__init__.py` will have only a few functions used by the Patroni class and from the setup.py
2022-01-06 10:20:31 +01:00

163 lines
6.4 KiB
Python

import os
import unittest
import tempfile
import time
from mock import Mock, PropertyMock, patch
from patroni.dcs.raft import DynMemberSyncObj, KVStoreTTL, Raft, SyncObjUtility, TCPTransport, _TCPTransport
from pysyncobj import SyncObjConf, FAIL_REASON
def remove_files(prefix):
for f in ('journal', 'journal.meta', 'dump'):
f = prefix + f
if os.path.isfile(f):
for i in range(0, 15):
try:
if os.path.isfile(f):
os.unlink(f)
break
else:
break
except Exception:
time.sleep(1.0)
class TestTCPTransport(unittest.TestCase):
@patch.object(TCPTransport, '__init__', Mock())
@patch.object(TCPTransport, 'setOnUtilityMessageCallback', Mock())
@patch.object(TCPTransport, '_connectIfNecessarySingle', Mock(side_effect=Exception))
def test__connectIfNecessarySingle(self):
t = _TCPTransport(Mock(), None, [])
self.assertFalse(t._connectIfNecessarySingle(None))
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
class TestDynMemberSyncObj(unittest.TestCase):
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
def setUp(self):
self.conf = SyncObjConf(appendEntriesUseBatch=False, dynamicMembershipChange=True, autoTick=False)
self.so = DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
@patch.object(SyncObjUtility, 'executeCommand')
def test_add_member(self, mock_execute_command):
mock_execute_command.return_value = [{'addr': '127.0.0.1:1235'}, {'addr': '127.0.0.1:1236'}]
mock_execute_command.ver = 0
DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
self.conf.dynamicMembershipChange = False
DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
def test_getMembers(self):
mock_conn = Mock()
self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, ['members'])
def test__SyncObj__doChangeCluster(self):
self.so._SyncObj__doChangeCluster(['add', '127.0.0.1:1236'])
@patch.object(SyncObjConf, 'fullDumpFile', PropertyMock(return_value=None), create=True)
@patch.object(SyncObjConf, 'journalFile', PropertyMock(return_value=None), create=True)
class TestKVStoreTTL(unittest.TestCase):
@patch.object(SyncObjConf, 'fullDumpFile', PropertyMock(return_value=None), create=True)
@patch.object(SyncObjConf, 'journalFile', PropertyMock(return_value=None), create=True)
def setUp(self):
callback = Mock()
callback.replicated = False
self.so = KVStoreTTL(None, callback, callback, self_addr='127.0.0.1:1234')
self.so.startAutoTick()
self.so.set_retry_timeout(10)
def tearDown(self):
if self.so:
self.so.destroy()
def test_set(self):
self.assertTrue(self.so.set('foo', 'bar', prevExist=False, ttl=30))
self.assertFalse(self.so.set('foo', 'bar', prevExist=False, ttl=30))
self.assertFalse(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}, prevValue=''))
self.assertTrue(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}))
def test_delete(self):
self.so.autoTickPeriod = 0.2
self.so.set('foo', 'bar')
self.so.set('fooo', 'bar')
self.assertFalse(self.so.delete('foo', prevValue='buz'))
self.assertTrue(self.so.delete('foo', recursive=True))
self.assertFalse(self.so.retry(self.so._delete, 'foo', prevValue=''))
def test_expire(self):
self.so.set('foo', 'bar', ttl=0.001)
time.sleep(1)
self.assertIsNone(self.so.get('foo'))
self.assertEqual(self.so.get('foo', recursive=True), {})
@patch('time.sleep', Mock())
def test_retry(self):
return_values = [FAIL_REASON.QUEUE_FULL] * 2 + [FAIL_REASON.SUCCESS, FAIL_REASON.REQUEST_DENIED]
def test(callback):
callback(True, return_values.pop(0))
with patch('time.time', Mock(side_effect=[1, 100])):
self.assertFalse(self.so.retry(test))
self.assertTrue(self.so.retry(test))
self.assertFalse(self.so.retry(test))
def test_on_ready_override(self):
self.assertTrue(self.so.set('foo', 'bar'))
self.so.destroy()
self.so = None
so = KVStoreTTL(Mock(), None, None, self_addr='127.0.0.1:1234',
partner_addrs=['127.0.0.1:1235'], patronictl=True)
so.doTick(0)
so.destroy()
class TestRaft(unittest.TestCase):
_TMP = tempfile.gettempdir()
def test_raft(self):
raft = Raft({'ttl': 30, 'scope': 'test', 'name': 'pg', 'self_addr': '127.0.0.1:1234',
'retry_timeout': 10, 'data_dir': self._TMP})
raft.reload_config({'retry_timeout': 20, 'ttl': 60, 'loop_wait': 10})
self.assertTrue(raft._sync_obj.set(raft.members_path + 'legacy', '{"version":"2.0.0"}'))
self.assertTrue(raft.touch_member(''))
self.assertTrue(raft.initialize())
self.assertTrue(raft.cancel_initialization())
self.assertTrue(raft.set_config_value('{}'))
self.assertTrue(raft.write_sync_state('foo', 'bar'))
self.assertTrue(raft.manual_failover('foo', 'bar'))
raft.get_cluster()
self.assertTrue(raft._sync_obj.set(raft.status_path, '{"optime":1234567,"slots":{"ls":12345}}'))
raft.get_cluster()
self.assertTrue(raft.update_leader('1'))
self.assertTrue(raft._sync_obj.set(raft.status_path, '{'))
raft.get_cluster()
self.assertTrue(raft.delete_sync_state())
self.assertTrue(raft.delete_leader())
self.assertTrue(raft.set_history_value(''))
self.assertTrue(raft.delete_cluster())
raft.get_cluster()
self.assertTrue(raft.take_leader())
raft.watch(None, 0.001)
raft._sync_obj.destroy()
def tearDown(self):
remove_files(os.path.join(self._TMP, '127.0.0.1:1234.'))
def setUp(self):
self.tearDown()
@patch('patroni.dcs.raft.KVStoreTTL')
@patch('threading.Event')
def test_init(self, mock_event, mock_kvstore):
mock_kvstore.return_value.applied_local_log = False
mock_event.return_value.is_set.side_effect = [False, True]
self.assertIsNotNone(Raft({'ttl': 30, 'scope': 'test', 'name': 'pg', 'patronictl': True,
'self_addr': '1', 'data_dir': self._TMP}))