mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Switch to py-consul (#3191)
python-consul is unmaintained for a long time and py-consul is an official replacement. However, we still keep backward compatibility with python-consul. Close: #3189
This commit is contained in:
committed by
GitHub
parent
72be036c99
commit
e8a8bfe42f
@@ -91,7 +91,7 @@ where dependencies can be either empty, or consist of one or more of the followi
|
||||
etcd or etcd3
|
||||
`python-etcd` module in order to use Etcd as DCS
|
||||
consul
|
||||
`python-consul` module in order to use Consul as DCS
|
||||
`py-consul` module in order to use Consul as DCS
|
||||
zookeeper
|
||||
`kazoo` module in order to use Zookeeper as DCS
|
||||
exhibitor
|
||||
|
||||
@@ -49,7 +49,7 @@ where ``dependencies`` can be either empty, or consist of one or more of the fol
|
||||
etcd or etcd3
|
||||
`python-etcd` module in order to use Etcd as Distributed Configuration Store (DCS)
|
||||
consul
|
||||
`python-consul` module in order to use Consul as DCS
|
||||
`py-consul` module in order to use Consul as DCS
|
||||
zookeeper
|
||||
`kazoo` module in order to use Zookeeper as DCS
|
||||
exhibitor
|
||||
|
||||
@@ -15,7 +15,7 @@ from urllib.parse import quote, urlencode, urlparse
|
||||
|
||||
import urllib3
|
||||
|
||||
from consul import base, ConsulException, NotFound
|
||||
from consul import base, Check, ConsulException, NotFound
|
||||
from urllib3.exceptions import HTTPError
|
||||
|
||||
from ..exceptions import DCSError
|
||||
@@ -181,7 +181,7 @@ class ConsulClient(base.Consul):
|
||||
self.token = kwargs.get('token')
|
||||
super(ConsulClient, self).__init__(*args, **kwargs)
|
||||
|
||||
def http_connect(self, *args: Any, **kwargs: Any) -> HTTPClient:
|
||||
def connect(self, *args: Any, **kwargs: Any) -> HTTPClient:
|
||||
kwargs.update(dict(zip(['host', 'port', 'scheme', 'verify'], args)))
|
||||
if self._cert:
|
||||
kwargs['cert'] = self._cert
|
||||
@@ -191,8 +191,8 @@ class ConsulClient(base.Consul):
|
||||
kwargs['token'] = self.token
|
||||
return HTTPClient(**kwargs)
|
||||
|
||||
def connect(self, *args: Any, **kwargs: Any) -> HTTPClient:
|
||||
return self.http_connect(*args, **kwargs)
|
||||
def http_connect(self, *args: Any, **kwargs: Any) -> HTTPClient:
|
||||
return self.connect(*args, **kwargs) # pragma: no cover
|
||||
|
||||
def reload_config(self, config: Dict[str, Any]) -> None:
|
||||
self.http.token = self.token = config.get('token')
|
||||
@@ -526,8 +526,8 @@ class Consul(AbstractDCS):
|
||||
api_parts = api_parts._replace(path='/{0}'.format(role))
|
||||
conn_url: str = data['conn_url']
|
||||
conn_parts = urlparse(conn_url)
|
||||
check = base.Check.http(api_parts.geturl(), self._service_check_interval,
|
||||
deregister='{0}s'.format(self._client.http.ttl * 10))
|
||||
check = Check.http(api_parts.geturl(), self._service_check_interval,
|
||||
deregister='{0}s'.format(self._client.http.ttl * 10))
|
||||
if self._service_check_tls_server_name is not None:
|
||||
check['TLSServerName'] = self._service_check_tls_server_name
|
||||
tags = self._service_tags[:]
|
||||
|
||||
@@ -3,7 +3,7 @@ boto3
|
||||
PyYAML
|
||||
kazoo>=1.3.1
|
||||
python-etcd>=0.4.3,<0.5
|
||||
python-consul>=0.7.1
|
||||
py-consul>=1.1.1
|
||||
click>=4.1
|
||||
prettytable>=0.7
|
||||
python-dateutil
|
||||
|
||||
2
setup.py
2
setup.py
@@ -25,7 +25,7 @@ KEYWORDS = 'etcd governor patroni postgresql postgres ha haproxy confd' +\
|
||||
' zookeeper exhibitor consul streaming replication kubernetes k8s'
|
||||
|
||||
EXTRAS_REQUIRE = {'aws': ['boto3'], 'etcd': ['python-etcd'], 'etcd3': ['python-etcd'],
|
||||
'consul': ['python-consul'], 'exhibitor': ['kazoo'], 'zookeeper': ['kazoo'],
|
||||
'consul': ['py-consul'], 'exhibitor': ['kazoo'], 'zookeeper': ['kazoo'],
|
||||
'kubernetes': [], 'raft': ['pysyncobj', 'cryptography'], 'jsonlogger': ['python-json-logger']}
|
||||
|
||||
# Add here all kinds of additional classifiers as defined under
|
||||
|
||||
@@ -89,13 +89,18 @@ class TestHTTPClient(unittest.TestCase):
|
||||
self.client.put(Mock(), '/v1/session/create', params=[], data='{"foo": "bar"}')
|
||||
|
||||
|
||||
@patch.object(consul.Consul.KV, 'get', kv_get)
|
||||
KV = consul.Consul.KV if hasattr(consul.Consul, 'KV') else consul.api.kv.KV
|
||||
Session = consul.Consul.Session if hasattr(consul.Consul, 'Session') else consul.api.session.Session
|
||||
Agent = consul.Consul.Agent if hasattr(consul.Consul, 'Agent') else consul.api.agent.Agent
|
||||
|
||||
|
||||
@patch.object(KV, 'get', kv_get)
|
||||
class TestConsul(unittest.TestCase):
|
||||
|
||||
@patch.object(consul.Consul.Session, 'create', Mock(return_value='fd4f44fe-2cac-bba5-a60b-304b51ff39b7'))
|
||||
@patch.object(consul.Consul.Session, 'renew', Mock(side_effect=NotFound))
|
||||
@patch.object(consul.Consul.KV, 'get', kv_get)
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock())
|
||||
@patch.object(Session, 'create', Mock(return_value='fd4f44fe-2cac-bba5-a60b-304b51ff39b7'))
|
||||
@patch.object(Session, 'renew', Mock(side_effect=NotFound))
|
||||
@patch.object(KV, 'get', kv_get)
|
||||
@patch.object(KV, 'delete', Mock())
|
||||
def setUp(self):
|
||||
self.assertIsInstance(get_dcs({'ttl': 30, 'scope': 't', 'name': 'p', 'retry_timeout': 10,
|
||||
'consul': {'url': 'https://l:1', 'verify': 'on',
|
||||
@@ -112,14 +117,14 @@ class TestConsul(unittest.TestCase):
|
||||
self.c.get_cluster()
|
||||
|
||||
@patch('time.sleep', Mock(side_effect=SleepException))
|
||||
@patch.object(consul.Consul.Session, 'create', Mock(side_effect=ConsulException))
|
||||
@patch.object(Session, 'create', Mock(side_effect=ConsulException))
|
||||
def test_create_session(self):
|
||||
self.c._session = None
|
||||
self.assertRaises(SleepException, self.c.create_session)
|
||||
|
||||
@patch.object(consul.Consul.Session, 'renew', Mock(side_effect=NotFound))
|
||||
@patch.object(consul.Consul.Session, 'create', Mock(side_effect=[InvalidSessionTTL, ConsulException]))
|
||||
@patch.object(consul.Consul.Agent, 'self', Mock(return_value={'Config': {'SessionTTLMin': 0}}))
|
||||
@patch.object(Session, 'renew', Mock(side_effect=NotFound))
|
||||
@patch.object(Session, 'create', Mock(side_effect=[InvalidSessionTTL, ConsulException]))
|
||||
@patch.object(Agent, 'self', Mock(return_value={'Config': {'SessionTTLMin': 0}}))
|
||||
@patch.object(HTTPClient, 'set_ttl', Mock(side_effect=ValueError))
|
||||
def test_referesh_session(self):
|
||||
self.c._session = '1'
|
||||
@@ -127,7 +132,7 @@ class TestConsul(unittest.TestCase):
|
||||
self.c._last_session_refresh = 0
|
||||
self.assertRaises(ConsulError, self.c.refresh_session)
|
||||
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock())
|
||||
@patch.object(KV, 'delete', Mock())
|
||||
def test_get_cluster(self):
|
||||
self.c._base_path = 'service/test'
|
||||
self.assertIsInstance(self.c.get_cluster(), Cluster)
|
||||
@@ -145,8 +150,8 @@ class TestConsul(unittest.TestCase):
|
||||
self.assertIsInstance(cluster, Cluster)
|
||||
self.assertIsInstance(cluster.workers[1], Cluster)
|
||||
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock(side_effect=[ConsulException, True, True, True]))
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=[True, ConsulException, InvalidSession]))
|
||||
@patch.object(KV, 'delete', Mock(side_effect=[ConsulException, True, True, True]))
|
||||
@patch.object(KV, 'put', Mock(side_effect=[True, ConsulException, InvalidSession]))
|
||||
def test_touch_member(self):
|
||||
self.c.refresh_session = Mock(return_value=False)
|
||||
with patch.object(Consul, 'update_service', Mock(side_effect=Exception)):
|
||||
@@ -159,7 +164,7 @@ class TestConsul(unittest.TestCase):
|
||||
self.c.refresh_session = Mock(side_effect=ConsulError('foo'))
|
||||
self.assertFalse(self.c.touch_member({'balbla': 'blabla'}))
|
||||
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=[InvalidSession, False, InvalidSession]))
|
||||
@patch.object(KV, 'put', Mock(side_effect=[InvalidSession, False, InvalidSession]))
|
||||
def test_take_leader(self):
|
||||
self.c.set_ttl(20)
|
||||
self.c._do_refresh_session = Mock()
|
||||
@@ -167,35 +172,35 @@ class TestConsul(unittest.TestCase):
|
||||
with patch('time.time', Mock(side_effect=[0, 0, 0, 100, 100, 100])):
|
||||
self.assertFalse(self.c.take_leader())
|
||||
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(return_value=True))
|
||||
@patch.object(KV, 'put', Mock(return_value=True))
|
||||
def test_set_failover_value(self):
|
||||
self.c.set_failover_value('')
|
||||
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(return_value=True))
|
||||
@patch.object(KV, 'put', Mock(return_value=True))
|
||||
def test_set_config_value(self):
|
||||
self.c.set_config_value('')
|
||||
|
||||
@patch.object(Cluster, 'min_version', PropertyMock(return_value=(2, 0)))
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException))
|
||||
@patch.object(KV, 'put', Mock(side_effect=ConsulException))
|
||||
def test_write_leader_optime(self):
|
||||
self.c.get_cluster()
|
||||
self.c.write_leader_optime('1')
|
||||
|
||||
@patch.object(consul.Consul.Session, 'renew')
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException))
|
||||
@patch.object(Session, 'renew')
|
||||
@patch.object(KV, 'put', Mock(side_effect=ConsulException))
|
||||
def test_update_leader(self, mock_renew):
|
||||
cluster = self.c.get_cluster()
|
||||
self.c._session = 'fd4f44fe-2cac-bba5-a60b-304b51ff39b8'
|
||||
with patch.object(consul.Consul.KV, 'delete', Mock(return_value=True)):
|
||||
with patch.object(consul.Consul.KV, 'put', Mock(return_value=True)):
|
||||
with patch.object(KV, 'delete', Mock(return_value=True)):
|
||||
with patch.object(KV, 'put', Mock(return_value=True)):
|
||||
self.assertTrue(self.c.update_leader(cluster, 12345, failsafe={'foo': 'bar'}))
|
||||
with patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException)):
|
||||
with patch.object(KV, 'put', Mock(side_effect=ConsulException)):
|
||||
self.assertFalse(self.c.update_leader(cluster, 12345))
|
||||
with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 100, 200, 300])):
|
||||
self.assertRaises(ConsulError, self.c.update_leader, cluster, 12345)
|
||||
with patch('time.time', Mock(side_effect=[0, 100, 200, 300])):
|
||||
self.assertRaises(ConsulError, self.c.update_leader, cluster, 12345)
|
||||
with patch.object(consul.Consul.KV, 'delete', Mock(side_effect=ConsulException)):
|
||||
with patch.object(KV, 'delete', Mock(side_effect=ConsulException)):
|
||||
self.assertFalse(self.c.update_leader(cluster, 12347))
|
||||
mock_renew.side_effect = RetryFailedError('')
|
||||
self.c._last_session_refresh = 0
|
||||
@@ -203,22 +208,22 @@ class TestConsul(unittest.TestCase):
|
||||
mock_renew.side_effect = ConsulException
|
||||
self.assertFalse(self.c.update_leader(cluster, 12347))
|
||||
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock(return_value=True))
|
||||
@patch.object(KV, 'delete', Mock(return_value=True))
|
||||
def test_delete_leader(self):
|
||||
leader = self.c.get_cluster().leader
|
||||
self.c.delete_leader(leader)
|
||||
self.c._name = 'other'
|
||||
self.c.delete_leader(leader)
|
||||
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(return_value=True))
|
||||
@patch.object(KV, 'put', Mock(return_value=True))
|
||||
def test_initialize(self):
|
||||
self.c.initialize()
|
||||
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock(return_value=True))
|
||||
@patch.object(KV, 'delete', Mock(return_value=True))
|
||||
def test_cancel_initialization(self):
|
||||
self.c.cancel_initialization()
|
||||
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock(return_value=True))
|
||||
@patch.object(KV, 'delete', Mock(return_value=True))
|
||||
def test_delete_cluster(self):
|
||||
self.c.delete_cluster()
|
||||
|
||||
@@ -227,28 +232,28 @@ class TestConsul(unittest.TestCase):
|
||||
self.c.watch(None, 1)
|
||||
self.c._name = ''
|
||||
self.c.watch(6429, 1)
|
||||
with patch.object(consul.Consul.KV, 'get', Mock(side_effect=ConsulException)):
|
||||
with patch.object(KV, 'get', Mock(side_effect=ConsulException)):
|
||||
self.c.watch(6429, 1)
|
||||
|
||||
def test_set_retry_timeout(self):
|
||||
self.c.set_retry_timeout(10)
|
||||
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock(return_value=True))
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(return_value=True))
|
||||
@patch.object(KV, 'delete', Mock(return_value=True))
|
||||
@patch.object(KV, 'put', Mock(return_value=True))
|
||||
def test_sync_state(self):
|
||||
self.assertEqual(self.c.set_sync_state_value('{}'), 1)
|
||||
with patch('time.time', Mock(side_effect=[1, 100, 1000])):
|
||||
self.assertFalse(self.c.set_sync_state_value('{}'))
|
||||
with patch.object(consul.Consul.KV, 'put', Mock(return_value=False)):
|
||||
with patch.object(KV, 'put', Mock(return_value=False)):
|
||||
self.assertFalse(self.c.set_sync_state_value('{}'))
|
||||
self.assertTrue(self.c.delete_sync_state())
|
||||
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(return_value=True))
|
||||
@patch.object(KV, 'put', Mock(return_value=True))
|
||||
def test_set_history_value(self):
|
||||
self.assertTrue(self.c.set_history_value('{}'))
|
||||
|
||||
@patch.object(consul.Consul.Agent.Service, 'register', Mock(side_effect=(False, True, True, True)))
|
||||
@patch.object(consul.Consul.Agent.Service, 'deregister', Mock(return_value=True))
|
||||
@patch.object(Agent.Service, 'register', Mock(side_effect=(False, True, True, True)))
|
||||
@patch.object(Agent.Service, 'deregister', Mock(return_value=True))
|
||||
def test_update_service(self):
|
||||
d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'}
|
||||
self.assertIsNone(self.c.update_service({}, {}))
|
||||
@@ -265,7 +270,7 @@ class TestConsul(unittest.TestCase):
|
||||
d['role'] = 'primary'
|
||||
self.assertTrue(self.c.update_service({}, d))
|
||||
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException))
|
||||
@patch.object(KV, 'put', Mock(side_effect=ConsulException))
|
||||
def test_reload_config(self):
|
||||
self.assertEqual([], self.c._service_tags)
|
||||
self.c.reload_config({'consul': {'token': 'foo', 'register_service': True, 'service_tags': ['foo']},
|
||||
@@ -278,7 +283,7 @@ class TestConsul(unittest.TestCase):
|
||||
|
||||
# Changing register_service from True to False calls deregister()
|
||||
self.c.reload_config({'consul': {'register_service': False}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
|
||||
with patch('consul.Consul.Agent.Service.deregister') as mock_deregister:
|
||||
with patch.object(Agent.Service, 'deregister') as mock_deregister:
|
||||
self.c.touch_member(d)
|
||||
mock_deregister.assert_called_once()
|
||||
|
||||
@@ -286,31 +291,31 @@ class TestConsul(unittest.TestCase):
|
||||
|
||||
# register_service staying False between reloads does not call deregister()
|
||||
self.c.reload_config({'consul': {'register_service': False}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
|
||||
with patch('consul.Consul.Agent.Service.deregister') as mock_deregister:
|
||||
with patch.object(Agent.Service, 'deregister') as mock_deregister:
|
||||
self.c.touch_member(d)
|
||||
self.assertFalse(mock_deregister.called)
|
||||
|
||||
# Changing register_service from False to True calls register()
|
||||
self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
|
||||
with patch('consul.Consul.Agent.Service.register') as mock_register:
|
||||
with patch.object(Agent.Service, 'register') as mock_register:
|
||||
self.c.touch_member(d)
|
||||
mock_register.assert_called_once()
|
||||
|
||||
# register_service staying True between reloads does not call register()
|
||||
self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
|
||||
with patch('consul.Consul.Agent.Service.register') as mock_register:
|
||||
with patch.object(Agent.Service, 'register') as mock_register:
|
||||
self.c.touch_member(d)
|
||||
self.assertFalse(mock_deregister.called)
|
||||
|
||||
# register_service staying True between reloads does calls register() if other service data has changed
|
||||
self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
|
||||
with patch('consul.Consul.Agent.Service.register') as mock_register:
|
||||
with patch.object(Agent.Service, 'register') as mock_register:
|
||||
self.c.touch_member(d)
|
||||
mock_register.assert_called_once()
|
||||
|
||||
# register_service staying True between reloads does calls register() if service_tags have changed
|
||||
self.c.reload_config({'consul': {'register_service': True, 'service_tags': ['foo']}, 'loop_wait': 10,
|
||||
'ttl': 30, 'retry_timeout': 10})
|
||||
with patch('consul.Consul.Agent.Service.register') as mock_register:
|
||||
with patch.object(Agent.Service, 'register') as mock_register:
|
||||
self.c.touch_member(d)
|
||||
mock_register.assert_called_once()
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
from consul.check import Check
|
||||
from consul.base import ConsulException, NotFound
|
||||
__all__ = ['ConsulException', 'Consul', 'NotFound']
|
||||
__all__ = ['Check', 'ConsulException', 'Consul', 'NotFound']
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
class ConsulException(Exception): ...
|
||||
class NotFound(ConsulException): ...
|
||||
class Check:
|
||||
@classmethod
|
||||
def http(klass, url: str, interval: str, timeout: Optional[str] = None, deregister: Optional[str] = None) -> Dict[str, str]: ...
|
||||
class Consul:
|
||||
http: Any
|
||||
agent: 'Consul.Agent'
|
||||
|
||||
5
typings/consul/check.pyi
Normal file
5
typings/consul/check.pyi
Normal file
@@ -0,0 +1,5 @@
|
||||
from typing import Dict, Optional
|
||||
class Check:
|
||||
@classmethod
|
||||
def http(klass, url: str, interval: str, timeout: Optional[str] = None, deregister: Optional[str] = None) -> Dict[str, str]: ...
|
||||
|
||||
Reference in New Issue
Block a user