mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
183 lines
7.6 KiB
Python
183 lines
7.6 KiB
Python
import os
|
|
import unittest
|
|
import tempfile
|
|
import time
|
|
|
|
from mock import Mock, PropertyMock, patch
|
|
from patroni.dcs import get_dcs
|
|
from patroni.dcs.raft import Cluster, DynMemberSyncObj, KVStoreTTL, \
|
|
Raft, RaftError, SyncObjUtility, TCPTransport, _TCPTransport
|
|
from patroni.postgresql.mpp import get_mpp
|
|
from pysyncobj import SyncObjConf, FAIL_REASON
|
|
|
|
|
|
def remove_files(prefix):
|
|
for f in ('journal', 'journal.meta', 'dump'):
|
|
f = prefix + f
|
|
if os.path.isfile(f):
|
|
for i in range(0, 15):
|
|
try:
|
|
if os.path.isfile(f):
|
|
os.unlink(f)
|
|
break
|
|
else:
|
|
break
|
|
except Exception:
|
|
time.sleep(1.0)
|
|
|
|
|
|
class TestTCPTransport(unittest.TestCase):
|
|
|
|
@patch.object(TCPTransport, '__init__', Mock())
|
|
@patch.object(TCPTransport, 'setOnUtilityMessageCallback', Mock())
|
|
@patch.object(TCPTransport, '_connectIfNecessarySingle', Mock(side_effect=Exception))
|
|
def test__connectIfNecessarySingle(self):
|
|
t = _TCPTransport(Mock(), None, [])
|
|
self.assertFalse(t._connectIfNecessarySingle(None))
|
|
|
|
|
|
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
|
|
class TestDynMemberSyncObj(unittest.TestCase):
|
|
|
|
@patch('pysyncobj.tcp_server.TcpServer.bind', Mock())
|
|
def setUp(self):
|
|
self.conf = SyncObjConf(appendEntriesUseBatch=False, dynamicMembershipChange=True, autoTick=False)
|
|
self.so = DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
|
|
|
|
@patch.object(SyncObjUtility, 'executeCommand')
|
|
def test_add_member(self, mock_execute_command):
|
|
mock_execute_command.return_value = [{'addr': '127.0.0.1:1235'}, {'addr': '127.0.0.1:1236'}]
|
|
mock_execute_command.ver = 0
|
|
DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
|
|
self.conf.dynamicMembershipChange = False
|
|
DynMemberSyncObj('127.0.0.1:1234', ['127.0.0.1:1235'], self.conf)
|
|
|
|
def test_getMembers(self):
|
|
mock_conn = Mock()
|
|
self.so._SyncObj__transport._onIncomingMessageReceived(mock_conn, ['members'])
|
|
|
|
def test__SyncObj__doChangeCluster(self):
|
|
self.so._SyncObj__doChangeCluster(['add', '127.0.0.1:1236'])
|
|
|
|
|
|
@patch.object(SyncObjConf, 'fullDumpFile', PropertyMock(return_value=None), create=True)
|
|
@patch.object(SyncObjConf, 'journalFile', PropertyMock(return_value=None), create=True)
|
|
class TestKVStoreTTL(unittest.TestCase):
|
|
|
|
@patch.object(SyncObjConf, 'fullDumpFile', PropertyMock(return_value=None), create=True)
|
|
@patch.object(SyncObjConf, 'journalFile', PropertyMock(return_value=None), create=True)
|
|
def setUp(self):
|
|
callback = Mock()
|
|
callback.replicated = False
|
|
self.so = KVStoreTTL(None, callback, callback, self_addr='127.0.0.1:1234')
|
|
self.so.startAutoTick()
|
|
self.so.set_retry_timeout(10)
|
|
|
|
def tearDown(self):
|
|
if self.so:
|
|
self.so.destroy()
|
|
|
|
def test_set(self):
|
|
self.assertTrue(self.so.set('foo', 'bar', prevExist=False, ttl=30))
|
|
self.assertFalse(self.so.set('foo', 'bar', prevExist=False, ttl=30))
|
|
self.assertFalse(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}, prevValue=''))
|
|
self.assertTrue(self.so.retry(self.so._set, 'foo', {'value': 'buz', 'created': 1, 'updated': 1}))
|
|
with patch.object(KVStoreTTL, 'retry', Mock(side_effect=RaftError(''))):
|
|
self.assertFalse(self.so.set('foo', 'bar'))
|
|
self.assertRaises(RaftError, self.so.set, 'foo', 'bar', handle_raft_error=False)
|
|
|
|
def test_delete(self):
|
|
self.so.autoTickPeriod = 0.2
|
|
self.so.set('foo', 'bar')
|
|
self.so.set('fooo', 'bar')
|
|
self.assertFalse(self.so.delete('foo', prevValue='buz'))
|
|
self.assertTrue(self.so.delete('foo', recursive=True))
|
|
self.assertFalse(self.so.retry(self.so._delete, 'foo', prevValue=''))
|
|
with patch.object(KVStoreTTL, 'retry', Mock(side_effect=RaftError(''))):
|
|
self.assertFalse(self.so.delete('foo'))
|
|
|
|
def test_expire(self):
|
|
self.so.set('foo', 'bar', ttl=0.001)
|
|
time.sleep(1)
|
|
self.assertIsNone(self.so.get('foo'))
|
|
self.assertEqual(self.so.get('foo', recursive=True), {})
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test_retry(self):
|
|
return_values = [FAIL_REASON.QUEUE_FULL] * 2 + [FAIL_REASON.SUCCESS, FAIL_REASON.REQUEST_DENIED]
|
|
|
|
def test(callback):
|
|
callback(True, return_values.pop(0))
|
|
|
|
with patch('time.time', Mock(side_effect=[1, 100])):
|
|
self.assertRaises(RaftError, self.so.retry, test)
|
|
|
|
self.assertTrue(self.so.retry(test))
|
|
self.assertFalse(self.so.retry(test))
|
|
|
|
def test_on_ready_override(self):
|
|
self.assertTrue(self.so.set('foo', 'bar'))
|
|
self.so.destroy()
|
|
self.so = None
|
|
so = KVStoreTTL(Mock(), None, None, self_addr='127.0.0.1:1234',
|
|
partner_addrs=['127.0.0.1:1235'], patronictl=True)
|
|
so.doTick(0)
|
|
so.destroy()
|
|
|
|
|
|
class TestRaft(unittest.TestCase):
|
|
|
|
_TMP = tempfile.gettempdir()
|
|
|
|
def test_raft(self):
|
|
raft = get_dcs({'ttl': 30, 'scope': 'test', 'name': 'pg', 'retry_timeout': 10,
|
|
'raft': {'self_addr': '127.0.0.1:1234', 'data_dir': self._TMP},
|
|
'citus': {'group': 0, 'database': 'postgres'}})
|
|
self.assertIsInstance(raft, Raft)
|
|
raft.reload_config({'retry_timeout': 20, 'ttl': 60, 'loop_wait': 10})
|
|
self.assertTrue(raft._sync_obj.set(raft.members_path + 'legacy', '{"version":"2.0.0"}'))
|
|
self.assertTrue(raft.touch_member(''))
|
|
self.assertTrue(raft.initialize())
|
|
self.assertTrue(raft.cancel_initialization())
|
|
self.assertTrue(raft.set_config_value('{}'))
|
|
self.assertTrue(raft.write_sync_state('foo', 'bar', 0))
|
|
self.assertFalse(raft.write_sync_state('foo', 'bar', 0, 1))
|
|
raft._mpp = get_mpp({'citus': {'group': 1, 'database': 'postgres'}})
|
|
self.assertTrue(raft.manual_failover('foo', 'bar'))
|
|
raft._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})
|
|
self.assertTrue(raft.take_leader())
|
|
cluster = raft.get_cluster()
|
|
self.assertIsInstance(cluster, Cluster)
|
|
self.assertIsInstance(cluster.workers[1], Cluster)
|
|
leader = cluster.leader
|
|
self.assertTrue(raft.delete_leader(leader))
|
|
self.assertTrue(raft._sync_obj.set(raft.status_path, '{"optime":1234567,"slots":{"ls":12345}}'))
|
|
raft.get_cluster()
|
|
self.assertTrue(raft.update_leader(leader, '1', failsafe={'foo': 'bat'}))
|
|
self.assertTrue(raft._sync_obj.set(raft.failsafe_path, '{"foo"}'))
|
|
self.assertTrue(raft._sync_obj.set(raft.status_path, '{'))
|
|
raft.get_mpp_coordinator()
|
|
self.assertTrue(raft.delete_sync_state())
|
|
self.assertTrue(raft.set_history_value(''))
|
|
self.assertTrue(raft.delete_cluster())
|
|
raft._mpp = get_mpp({'citus': {'group': 1, 'database': 'postgres'}})
|
|
self.assertTrue(raft.delete_cluster())
|
|
raft._mpp = get_mpp({})
|
|
raft.get_cluster()
|
|
raft.watch(None, 0.001)
|
|
raft._sync_obj.destroy()
|
|
|
|
def tearDown(self):
|
|
remove_files(os.path.join(self._TMP, '127.0.0.1:1234.'))
|
|
|
|
def setUp(self):
|
|
self.tearDown()
|
|
|
|
@patch('patroni.dcs.raft.KVStoreTTL')
|
|
@patch('threading.Event')
|
|
def test_init(self, mock_event, mock_kvstore):
|
|
mock_kvstore.return_value.applied_local_log = False
|
|
mock_event.return_value.is_set.side_effect = [False, True]
|
|
self.assertIsInstance(get_dcs({'ttl': 30, 'scope': 'test', 'name': 'pg', 'patronictl': True,
|
|
'raft': {'self_addr': '1', 'data_dir': self._TMP}}), Raft)
|