Files
patroni/tests/test_etcd.py
Alexander Kukushkin 03c2a85d23 Expose current timeline in DCS and via API (#591)
It is very easy to get current timeline on the master by executing
```sql
SELECT ('x' || SUBSTR(pg_walfile_name(pg_current_wal_lsn()), 1, 8))::bit(32)::int
```

Unfortunately the same method doesn't work when postgres is_in_recovery. Therefore we will use replication connection for that on the replicas. In order to avoid opening and closing replication connection on every HA loop we will cache the result if its value matches with the timeline of the master.

Also this PR introduces a new key in DCS: `/history`. It will contain a json serialized object with timeline history in a format similar to the usual history files. The differences are:
* Second column is the absolute wal position in bytes, instead of LSN
* Optionally there might be a fourth column - timestamp, (mtime of history file)
2018-01-05 15:25:56 +01:00

341 lines
14 KiB
Python

import etcd
import json
import urllib3.util.connection
import requests
import socket
import unittest
from dns.exception import DNSException
from mock import Mock, patch
from patroni.dcs.etcd import AbstractDCS, Client, Cluster, Etcd, EtcdError, DnsCachingResolver
from patroni.exceptions import DCSError
from urllib3.exceptions import ReadTimeoutError
class MockResponse(object):
def __init__(self, status_code=200):
self.status_code = status_code
self.content = '{}'
self.ok = True
def json(self):
return json.loads(self.content)
@property
def data(self):
return self.content.encode('utf-8')
@property
def text(self):
return self.content
@property
def status(self):
return self.status_code
@staticmethod
def getheader(*args):
return ''
def requests_get(url, **kwargs):
members = '[{"id":14855829450254237642,"peerURLs":["http://localhost:2380","http://localhost:7001"],' +\
'"name":"default","clientURLs":["http://localhost:2379","http://localhost:4001"]}]'
response = MockResponse()
if url.startswith('http://local'):
raise requests.exceptions.RequestException()
elif ':8011/patroni' in url:
response.content = '{"role": "replica", "xlog": {"received_location": 0}, "tags": {}}'
elif url.endswith('/members'):
response.content = '[{}]' if url.startswith('http://error') else members
elif url.startswith('http://exhibitor'):
response.content = '{"servers":["127.0.0.1","127.0.0.2","127.0.0.3"],"port":2181}'
elif url.endswith(':8011/reinitialize'):
data = kwargs.get('data', '')
if ' false}' in data:
response.status_code = 503
response.ok = False
response.content = 'restarting after failure already in progress'
else:
response.status_code = 404
response.ok = False
return response
def etcd_watch(self, key, index=None, timeout=None, recursive=None):
if timeout == 2.0:
raise etcd.EtcdWatchTimedOut
elif timeout == 5.0:
return etcd.EtcdResult('delete', {})
elif 5 < timeout <= 10.0:
raise etcd.EtcdException
elif timeout == 20.0:
raise etcd.EtcdEventIndexCleared
def etcd_write(self, key, value, **kwargs):
if key == '/service/exists/leader':
raise etcd.EtcdAlreadyExist
if key in ['/service/test/leader', '/patroni/test/leader'] and \
(kwargs.get('prevValue') == 'foo' or not kwargs.get('prevExist', True)):
return True
raise etcd.EtcdException
def etcd_read(self, key, **kwargs):
if key == '/service/noleader/':
raise DCSError('noleader')
elif key == '/service/nocluster/':
raise etcd.EtcdKeyNotFound
response = {"action": "get", "node": {"key": "/service/batman5", "dir": True, "nodes": [
{"key": "/service/batman5/config", "value": '{"foo": "bar"}',
"modifiedIndex": 1582, "createdIndex": 1582},
{"key": "/service/batman5/failover", "value": "",
"modifiedIndex": 1582, "createdIndex": 1582},
{"key": "/service/batman5/initialize", "value": "postgresql0",
"modifiedIndex": 1582, "createdIndex": 1582},
{"key": "/service/batman5/leader", "value": "postgresql1",
"expiration": "2015-05-15T09:11:00.037397538Z", "ttl": 21,
"modifiedIndex": 20728, "createdIndex": 20434},
{"key": "/service/batman5/optime", "dir": True, "nodes": [
{"key": "/service/batman5/optime/leader", "value": "2164261704",
"modifiedIndex": 20729, "createdIndex": 20729}],
"modifiedIndex": 20437, "createdIndex": 20437},
{"key": "/service/batman5/sync", "value": '{"leader": "leader"}',
"modifiedIndex": 1582, "createdIndex": 1582},
{"key": "/service/batman5/members", "dir": True, "nodes": [
{"key": "/service/batman5/members/postgresql1",
"value": "postgres://replicator:rep-pass@127.0.0.1:5434/postgres" +
"?application_name=http://127.0.0.1:8009/patroni",
"expiration": "2015-05-15T09:10:59.949384522Z", "ttl": 21,
"modifiedIndex": 20727, "createdIndex": 20727},
{"key": "/service/batman5/members/postgresql0",
"value": "postgres://replicator:rep-pass@127.0.0.1:5433/postgres" +
"?application_name=http://127.0.0.1:8008/patroni",
"expiration": "2015-05-15T09:11:09.611860899Z", "ttl": 30,
"modifiedIndex": 20730, "createdIndex": 20730}],
"modifiedIndex": 1581, "createdIndex": 1581}], "modifiedIndex": 1581, "createdIndex": 1581}}
result = etcd.EtcdResult(**response)
result.etcd_index = 0
return result
class SleepException(Exception):
pass
def dns_query(name, _):
if '-server' not in name or '-ssl' in name:
return []
if name == '_etcd-server._tcp.blabla':
return []
elif name == '_etcd-server._tcp.exception':
raise DNSException()
srv = Mock()
srv.port = 2380
srv.target.to_text.return_value = 'localhost' if name == '_etcd-server._tcp.foobar' else '127.0.0.1'
return [srv]
def socket_getaddrinfo(*args):
if args[0] in ('ok', 'localhost', '127.0.0.1'):
return [(socket.AF_INET, 1, 6, '', ('127.0.0.1', 0)), (socket.AF_INET6, 1, 6, '', ('::1', 0))]
raise socket.gaierror
def http_request(method, url, **kwargs):
if url == 'http://localhost:2379/timeout':
raise ReadTimeoutError(None, None, None)
if url == 'http://localhost:2379/v2/machines':
ret = MockResponse()
ret.content = 'http://localhost:2379,http://localhost:4001'
return ret
if url == 'http://localhost:2379/':
return MockResponse()
raise socket.error
class TestDnsCachingResolver(unittest.TestCase):
@patch('time.sleep', Mock(side_effect=SleepException))
@patch('socket.getaddrinfo', Mock(side_effect=socket.gaierror))
def test_run(self):
r = DnsCachingResolver()
self.assertIsNone(r.resolve_async('', 0))
r.join()
@patch('dns.resolver.query', dns_query)
@patch('socket.getaddrinfo', socket_getaddrinfo)
@patch('requests.get', requests_get)
class TestClient(unittest.TestCase):
@patch('dns.resolver.query', dns_query)
@patch('socket.getaddrinfo', socket_getaddrinfo)
@patch('requests.get', requests_get)
def setUp(self):
with patch.object(Client, 'machines') as mock_machines:
mock_machines.__get__ = Mock(return_value=['http://localhost:2379', 'http://localhost:4001'])
self.client = Client({'srv': 'test', 'retry_timeout': 3}, DnsCachingResolver())
self.client.http.request = http_request
self.client.http.request_encode_body = http_request
def test_machines(self):
self.client._base_uri = 'http://localhost:4001'
self.client._machines_cache = ['http://localhost:2379']
self.assertIsNotNone(self.client.machines)
self.client._base_uri = 'http://localhost:4001'
self.client._machines_cache = []
self.assertIsNotNone(self.client.machines)
self.client._update_machines_cache = True
machines = None
try:
machines = self.client.machines
self.assertFail()
except Exception:
self.assertIsNone(machines)
@patch.object(Client, 'machines')
def test_api_execute(self, mock_machines):
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
self.assertRaises(ValueError, self.client.api_execute, '', '')
self.client._base_uri = 'http://localhost:4001'
self.client._machines_cache = ['http://localhost:2379']
self.client.api_execute('/', 'POST', timeout=0)
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
self.client._machines_cache_updated = 0
self.client.api_execute('/', 'POST', timeout=0)
self.assertRaises(etcd.EtcdWatchTimedOut, self.client.api_execute, '/timeout', 'POST', params={'wait': 'true'})
self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', '')
self.client._update_machines_cache = True
with patch.object(Client, '_load_machines_cache', Mock(side_effect=etcd.EtcdException)):
self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', 'GET')
def test_get_srv_record(self):
self.assertEquals(self.client.get_srv_record('_etcd-server._tcp.blabla'), [])
self.assertEquals(self.client.get_srv_record('_etcd-server._tcp.exception'), [])
def test__get_machines_cache_from_srv(self):
self.client._get_machines_cache_from_srv('foobar')
self.client.get_srv_record = Mock(return_value=[('localhost', 2380)])
self.client._get_machines_cache_from_srv('blabla')
def test__get_machines_cache_from_dns(self):
self.client._get_machines_cache_from_dns('error', 2379)
@patch.object(Client, 'machines')
def test__load_machines_cache(self, mock_machines):
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
self.client._config = {}
self.assertRaises(Exception, self.client._load_machines_cache)
self.client._config = {'srv': 'blabla'}
self.assertRaises(etcd.EtcdException, self.client._load_machines_cache)
@patch.object(socket.socket, 'connect')
def test_create_connection_patched(self, mock_connect):
self.assertRaises(socket.error, urllib3.util.connection.create_connection, ('fail', 2379))
urllib3.util.connection.create_connection(('[localhost]', 2379))
mock_connect.side_effect = socket.error
self.assertRaises(socket.error, urllib3.util.connection.create_connection, ('[localhost]', 2379),
timeout=1, source_address=('localhost', 53333),
socket_options=[(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)])
@patch('requests.get', requests_get)
@patch('socket.getaddrinfo', socket_getaddrinfo)
@patch.object(etcd.Client, 'write', etcd_write)
@patch.object(etcd.Client, 'read', etcd_read)
@patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException))
class TestEtcd(unittest.TestCase):
@patch('socket.getaddrinfo', socket_getaddrinfo)
def setUp(self):
with patch.object(Client, 'machines') as mock_machines:
mock_machines.__get__ = Mock(return_value=['http://localhost:2379', 'http://localhost:4001'])
self.etcd = Etcd({'namespace': '/patroni/', 'ttl': 30, 'retry_timeout': 10,
'host': 'localhost:2379', 'scope': 'test', 'name': 'foo'})
def test_base_path(self):
self.assertEquals(self.etcd._base_path, '/patroni/test')
@patch('dns.resolver.query', dns_query)
def test_get_etcd_client(self):
with patch.object(Client, 'machines') as mock_machines:
mock_machines.__get__ = Mock(side_effect=etcd.EtcdException)
with patch('time.sleep', Mock(side_effect=SleepException)):
self.assertRaises(SleepException, self.etcd.get_etcd_client,
{'discovery_srv': 'test', 'retry_timeout': 10, 'cacert': '1', 'key': '1', 'cert': 1})
self.assertRaises(SleepException, self.etcd.get_etcd_client,
{'url': 'https://test:2379', 'retry_timeout': 10})
self.assertRaises(SleepException, self.etcd.get_etcd_client,
{'proxy': 'https://user:password@test:2379', 'retry_timeout': 10})
self.assertRaises(SleepException, self.etcd.get_etcd_client,
{'hosts': 'foo:4001,bar', 'retry_timeout': 10})
def test_get_cluster(self):
self.assertIsInstance(self.etcd.get_cluster(), Cluster)
self.etcd._base_path = '/service/nocluster'
cluster = self.etcd.get_cluster()
self.assertIsInstance(cluster, Cluster)
self.assertIsNone(cluster.leader)
self.etcd._base_path = '/service/noleader'
self.assertRaises(EtcdError, self.etcd.get_cluster)
def test_touch_member(self):
self.assertFalse(self.etcd.touch_member('', ''))
def test_take_leader(self):
self.assertFalse(self.etcd.take_leader())
def test_attempt_to_acquire_leader(self):
self.etcd._base_path = '/service/exists'
self.assertFalse(self.etcd.attempt_to_acquire_leader())
self.etcd._base_path = '/service/failed'
self.assertFalse(self.etcd.attempt_to_acquire_leader())
def test_write_leader_optime(self):
self.etcd.write_leader_optime('0')
def test_update_leader(self):
self.assertTrue(self.etcd.update_leader(None))
def test_initialize(self):
self.assertFalse(self.etcd.initialize())
def test_cancel_initializion(self):
self.assertFalse(self.etcd.cancel_initialization())
def test_delete_leader(self):
self.assertFalse(self.etcd.delete_leader())
def test_delete_cluster(self):
self.assertFalse(self.etcd.delete_cluster())
@patch('time.sleep', Mock(side_effect=SleepException))
@patch.object(etcd.Client, 'watch', etcd_watch)
def test_watch(self):
self.etcd.watch(None, 0)
self.etcd.get_cluster()
self.etcd.watch(20729, 1.5)
self.etcd.watch(20729, 4.5)
with patch.object(AbstractDCS, 'watch', Mock()):
self.assertTrue(self.etcd.watch(20729, 19.5))
self.assertRaises(SleepException, self.etcd.watch, 20729, 9.5)
def test_other_exceptions(self):
self.etcd.retry = Mock(side_effect=AttributeError('foo'))
self.assertRaises(EtcdError, self.etcd.cancel_initialization)
def test_set_ttl(self):
self.etcd.set_ttl(20)
self.assertTrue(self.etcd.watch(None, 1))
def test_sync_state(self):
self.assertFalse(self.etcd.write_sync_state('leader', None))
self.assertFalse(self.etcd.delete_sync_state())
def test_set_history_value(self):
self.assertFalse(self.etcd.set_history_value('{}'))