mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
* Convert postgresql.py into a package * Factor out cancellable process into a separate class * Factor out connection handler into a separate class * Move postmaster into postgresql package * Factor out pg_rewind into a separate class * Factor out bootstrap into a separate class * Factor out slots handler into a separate class * Factor out postgresql config handler into a separate class * Move callback_executor into postgresql package This is just a careful refactoring, without code changes.
291 lines
13 KiB
Python
291 lines
13 KiB
Python
import etcd
|
|
import urllib3.util.connection
|
|
import socket
|
|
import unittest
|
|
|
|
from dns.exception import DNSException
|
|
from mock import Mock, patch
|
|
from patroni.dcs.etcd import AbstractDCS, Client, Cluster, Etcd, EtcdError, DnsCachingResolver
|
|
from patroni.exceptions import DCSError
|
|
from urllib3.exceptions import ReadTimeoutError
|
|
|
|
from . import SleepException, MockResponse, requests_get
|
|
|
|
|
|
def etcd_watch(self, key, index=None, timeout=None, recursive=None):
|
|
if timeout == 2.0:
|
|
raise etcd.EtcdWatchTimedOut
|
|
elif timeout == 5.0:
|
|
return etcd.EtcdResult('delete', {})
|
|
elif 5 < timeout <= 10.0:
|
|
raise etcd.EtcdException
|
|
elif timeout == 20.0:
|
|
raise etcd.EtcdEventIndexCleared
|
|
|
|
|
|
def etcd_write(self, key, value, **kwargs):
|
|
if key == '/service/exists/leader':
|
|
raise etcd.EtcdAlreadyExist
|
|
if key in ['/service/test/leader', '/patroni/test/leader'] and \
|
|
(kwargs.get('prevValue') == 'foo' or not kwargs.get('prevExist', True)):
|
|
return True
|
|
raise etcd.EtcdException
|
|
|
|
|
|
def etcd_read(self, key, **kwargs):
|
|
if key == '/service/noleader/':
|
|
raise DCSError('noleader')
|
|
elif key == '/service/nocluster/':
|
|
raise etcd.EtcdKeyNotFound
|
|
|
|
response = {"action": "get", "node": {"key": "/service/batman5", "dir": True, "nodes": [
|
|
{"key": "/service/batman5/config", "value": '{"synchronous_mode": 0}',
|
|
"modifiedIndex": 1582, "createdIndex": 1582},
|
|
{"key": "/service/batman5/failover", "value": "",
|
|
"modifiedIndex": 1582, "createdIndex": 1582},
|
|
{"key": "/service/batman5/initialize", "value": "postgresql0",
|
|
"modifiedIndex": 1582, "createdIndex": 1582},
|
|
{"key": "/service/batman5/leader", "value": "postgresql1",
|
|
"expiration": "2015-05-15T09:11:00.037397538Z", "ttl": 21,
|
|
"modifiedIndex": 20728, "createdIndex": 20434},
|
|
{"key": "/service/batman5/optime", "dir": True, "nodes": [
|
|
{"key": "/service/batman5/optime/leader", "value": "2164261704",
|
|
"modifiedIndex": 20729, "createdIndex": 20729}],
|
|
"modifiedIndex": 20437, "createdIndex": 20437},
|
|
{"key": "/service/batman5/sync", "value": '{"leader": "leader"}',
|
|
"modifiedIndex": 1582, "createdIndex": 1582},
|
|
{"key": "/service/batman5/members", "dir": True, "nodes": [
|
|
{"key": "/service/batman5/members/postgresql1",
|
|
"value": "postgres://replicator:rep-pass@127.0.0.1:5434/postgres" +
|
|
"?application_name=http://127.0.0.1:8009/patroni",
|
|
"expiration": "2015-05-15T09:10:59.949384522Z", "ttl": 21,
|
|
"modifiedIndex": 20727, "createdIndex": 20727},
|
|
{"key": "/service/batman5/members/postgresql0",
|
|
"value": "postgres://replicator:rep-pass@127.0.0.1:5433/postgres" +
|
|
"?application_name=http://127.0.0.1:8008/patroni",
|
|
"expiration": "2015-05-15T09:11:09.611860899Z", "ttl": 30,
|
|
"modifiedIndex": 20730, "createdIndex": 20730}],
|
|
"modifiedIndex": 1581, "createdIndex": 1581}], "modifiedIndex": 1581, "createdIndex": 1581}}
|
|
result = etcd.EtcdResult(**response)
|
|
result.etcd_index = 0
|
|
return result
|
|
|
|
|
|
def dns_query(name, _):
|
|
if '-server' not in name or '-ssl' in name:
|
|
return []
|
|
if name == '_etcd-server._tcp.blabla':
|
|
return []
|
|
elif name == '_etcd-server._tcp.exception':
|
|
raise DNSException()
|
|
srv = Mock()
|
|
srv.port = 2380
|
|
srv.target.to_text.return_value = 'localhost' if name == '_etcd-server._tcp.foobar' else '127.0.0.1'
|
|
return [srv]
|
|
|
|
|
|
def socket_getaddrinfo(*args):
|
|
if args[0] in ('ok', 'localhost', '127.0.0.1'):
|
|
return [(socket.AF_INET, 1, 6, '', ('127.0.0.1', 0)), (socket.AF_INET6, 1, 6, '', ('::1', 0))]
|
|
raise socket.gaierror
|
|
|
|
|
|
def http_request(method, url, **kwargs):
|
|
if url == 'http://localhost:2379/timeout':
|
|
raise ReadTimeoutError(None, None, None)
|
|
ret = MockResponse()
|
|
if url == 'http://localhost:2379/v2/machines':
|
|
ret.content = 'http://localhost:2379,http://localhost:4001'
|
|
elif url == 'http://localhost:4001/v2/machines':
|
|
ret.content = ''
|
|
elif url != 'http://localhost:2379/':
|
|
raise socket.error
|
|
return ret
|
|
|
|
|
|
class TestDnsCachingResolver(unittest.TestCase):
|
|
|
|
@patch('time.sleep', Mock(side_effect=SleepException))
|
|
@patch('socket.getaddrinfo', Mock(side_effect=socket.gaierror))
|
|
def test_run(self):
|
|
r = DnsCachingResolver()
|
|
self.assertIsNone(r.resolve_async('', 0))
|
|
r.join()
|
|
|
|
|
|
@patch('dns.resolver.query', dns_query)
|
|
@patch('socket.getaddrinfo', socket_getaddrinfo)
|
|
@patch('requests.get', requests_get)
|
|
class TestClient(unittest.TestCase):
|
|
|
|
@patch('dns.resolver.query', dns_query)
|
|
@patch('socket.getaddrinfo', socket_getaddrinfo)
|
|
@patch('requests.get', requests_get)
|
|
def setUp(self):
|
|
with patch.object(Client, 'machines') as mock_machines:
|
|
mock_machines.__get__ = Mock(return_value=['http://localhost:2379', 'http://localhost:4001'])
|
|
self.client = Client({'srv': 'test', 'retry_timeout': 3}, DnsCachingResolver())
|
|
self.client.http.request = http_request
|
|
self.client.http.request_encode_body = http_request
|
|
|
|
def test_machines(self):
|
|
self.client._base_uri = 'http://localhost:4001'
|
|
self.client._machines_cache = ['http://localhost:2379']
|
|
self.assertIsNotNone(self.client.machines)
|
|
self.client._base_uri = 'http://localhost:4001'
|
|
self.client._machines_cache = []
|
|
self.assertIsNotNone(self.client.machines)
|
|
self.client._update_machines_cache = True
|
|
machines = None
|
|
try:
|
|
machines = self.client.machines
|
|
self.assertFail()
|
|
except Exception:
|
|
self.assertIsNone(machines)
|
|
|
|
@patch.object(Client, 'machines')
|
|
def test_api_execute(self, mock_machines):
|
|
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
|
|
self.assertRaises(ValueError, self.client.api_execute, '', '')
|
|
self.client._base_uri = 'http://localhost:4001'
|
|
self.client._machines_cache = ['http://localhost:2379']
|
|
self.client.api_execute('/', 'POST', timeout=0)
|
|
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
|
|
self.client._machines_cache_updated = 0
|
|
self.client.api_execute('/', 'POST', timeout=0)
|
|
self.client._machines_cache = [self.client._base_uri]
|
|
self.assertRaises(etcd.EtcdWatchTimedOut, self.client.api_execute, '/timeout', 'POST', params={'wait': 'true'})
|
|
self.assertRaises(etcd.EtcdWatchTimedOut, self.client.api_execute, '/timeout', 'POST', params={'wait': 'true'})
|
|
self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', '')
|
|
with patch.object(Client, '_load_machines_cache', Mock(side_effect=etcd.EtcdException)):
|
|
self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', 'GET')
|
|
|
|
def test_get_srv_record(self):
|
|
self.assertEqual(self.client.get_srv_record('_etcd-server._tcp.blabla'), [])
|
|
self.assertEqual(self.client.get_srv_record('_etcd-server._tcp.exception'), [])
|
|
|
|
def test__get_machines_cache_from_srv(self):
|
|
self.client._get_machines_cache_from_srv('foobar')
|
|
self.client.get_srv_record = Mock(return_value=[('localhost', 2380)])
|
|
self.client._get_machines_cache_from_srv('blabla')
|
|
|
|
def test__get_machines_cache_from_dns(self):
|
|
self.client._get_machines_cache_from_dns('error', 2379)
|
|
|
|
@patch.object(Client, 'machines')
|
|
def test__load_machines_cache(self, mock_machines):
|
|
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
|
|
self.client._config = {}
|
|
self.assertRaises(Exception, self.client._load_machines_cache)
|
|
self.client._config = {'srv': 'blabla'}
|
|
self.assertRaises(etcd.EtcdException, self.client._load_machines_cache)
|
|
|
|
@patch.object(socket.socket, 'connect')
|
|
def test_create_connection_patched(self, mock_connect):
|
|
self.assertRaises(socket.error, urllib3.util.connection.create_connection, ('fail', 2379))
|
|
urllib3.util.connection.create_connection(('[localhost]', 2379))
|
|
mock_connect.side_effect = socket.error
|
|
self.assertRaises(socket.error, urllib3.util.connection.create_connection, ('[localhost]', 2379),
|
|
timeout=1, source_address=('localhost', 53333),
|
|
socket_options=[(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)])
|
|
|
|
|
|
@patch('requests.get', requests_get)
|
|
@patch('socket.getaddrinfo', socket_getaddrinfo)
|
|
@patch.object(etcd.Client, 'write', etcd_write)
|
|
@patch.object(etcd.Client, 'read', etcd_read)
|
|
@patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException))
|
|
class TestEtcd(unittest.TestCase):
|
|
|
|
@patch('socket.getaddrinfo', socket_getaddrinfo)
|
|
def setUp(self):
|
|
with patch.object(Client, 'machines') as mock_machines:
|
|
mock_machines.__get__ = Mock(return_value=['http://localhost:2379', 'http://localhost:4001'])
|
|
self.etcd = Etcd({'namespace': '/patroni/', 'ttl': 30, 'retry_timeout': 10,
|
|
'host': 'localhost:2379', 'scope': 'test', 'name': 'foo'})
|
|
|
|
def test_base_path(self):
|
|
self.assertEqual(self.etcd._base_path, '/patroni/test')
|
|
|
|
@patch('dns.resolver.query', dns_query)
|
|
def test_get_etcd_client(self):
|
|
with patch('time.sleep', Mock(side_effect=SleepException)),\
|
|
patch.object(Client, 'machines') as mock_machines:
|
|
mock_machines.__get__ = Mock(side_effect=etcd.EtcdException)
|
|
self.assertRaises(SleepException, self.etcd.get_etcd_client,
|
|
{'discovery_srv': 'test', 'retry_timeout': 10, 'cacert': '1', 'key': '1', 'cert': 1})
|
|
self.assertRaises(SleepException, self.etcd.get_etcd_client,
|
|
{'url': 'https://test:2379', 'retry_timeout': 10})
|
|
self.assertRaises(SleepException, self.etcd.get_etcd_client,
|
|
{'hosts': 'foo:4001,bar', 'retry_timeout': 10})
|
|
mock_machines.__get__ = Mock(return_value=[])
|
|
self.assertRaises(SleepException, self.etcd.get_etcd_client,
|
|
{'proxy': 'https://user:password@test:2379', 'retry_timeout': 10})
|
|
|
|
def test_get_cluster(self):
|
|
cluster = self.etcd.get_cluster()
|
|
self.assertIsInstance(cluster, Cluster)
|
|
self.assertFalse(cluster.is_synchronous_mode())
|
|
self.etcd._base_path = '/service/nocluster'
|
|
cluster = self.etcd.get_cluster()
|
|
self.assertIsInstance(cluster, Cluster)
|
|
self.assertIsNone(cluster.leader)
|
|
self.etcd._base_path = '/service/noleader'
|
|
self.assertRaises(EtcdError, self.etcd.get_cluster)
|
|
|
|
def test_touch_member(self):
|
|
self.assertFalse(self.etcd.touch_member('', ''))
|
|
|
|
def test_take_leader(self):
|
|
self.assertFalse(self.etcd.take_leader())
|
|
|
|
def test_attempt_to_acquire_leader(self):
|
|
self.etcd._base_path = '/service/exists'
|
|
self.assertFalse(self.etcd.attempt_to_acquire_leader())
|
|
self.etcd._base_path = '/service/failed'
|
|
self.assertFalse(self.etcd.attempt_to_acquire_leader())
|
|
|
|
def test_write_leader_optime(self):
|
|
self.etcd.write_leader_optime('0')
|
|
|
|
def test_update_leader(self):
|
|
self.assertTrue(self.etcd.update_leader(None))
|
|
|
|
def test_initialize(self):
|
|
self.assertFalse(self.etcd.initialize())
|
|
|
|
def test_cancel_initializion(self):
|
|
self.assertFalse(self.etcd.cancel_initialization())
|
|
|
|
def test_delete_leader(self):
|
|
self.assertFalse(self.etcd.delete_leader())
|
|
|
|
def test_delete_cluster(self):
|
|
self.assertFalse(self.etcd.delete_cluster())
|
|
|
|
@patch('time.sleep', Mock(side_effect=SleepException))
|
|
@patch.object(etcd.Client, 'watch', etcd_watch)
|
|
def test_watch(self):
|
|
self.etcd.watch(None, 0)
|
|
self.etcd.get_cluster()
|
|
self.etcd.watch(20729, 1.5)
|
|
self.etcd.watch(20729, 4.5)
|
|
with patch.object(AbstractDCS, 'watch', Mock()):
|
|
self.assertTrue(self.etcd.watch(20729, 19.5))
|
|
self.assertRaises(SleepException, self.etcd.watch, 20729, 9.5)
|
|
|
|
def test_other_exceptions(self):
|
|
self.etcd.retry = Mock(side_effect=AttributeError('foo'))
|
|
self.assertRaises(EtcdError, self.etcd.cancel_initialization)
|
|
|
|
def test_set_ttl(self):
|
|
self.etcd.set_ttl(20)
|
|
self.assertTrue(self.etcd.watch(None, 1))
|
|
|
|
def test_sync_state(self):
|
|
self.assertFalse(self.etcd.write_sync_state('leader', None))
|
|
self.assertFalse(self.etcd.delete_sync_state())
|
|
|
|
def test_set_history_value(self):
|
|
self.assertFalse(self.etcd.set_history_value('{}'))
|