diff --git a/patroni/dcs/consul.py b/patroni/dcs/consul.py index bbfd2453..e9834266 100644 --- a/patroni/dcs/consul.py +++ b/patroni/dcs/consul.py @@ -42,6 +42,57 @@ class InvalidSession(ConsulException): """invalid session""" +class ConsulAgentService(base.Consul.Agent.Service): + """ + Consul.Agent.Session with support of ``tagged_addresses``. + + We do it in the Patroni code because ``python-consul`` and + ``python-consul2`` modules don't receive any updates for at least 3 years. + """ + + def register(self, name: str, service_id: Optional[str] = None, address: Optional[str] = None, + port: Optional[int] = None, tags: Optional[List[str]] = None, check: Optional[Dict[str, str]] = None, + token: Optional[str] = None, enable_tag_override: bool = False, + tagged_addresses: Optional[Dict[str, Dict[str, Union[str, int]]]] = None, **kwargs: Any) -> bool: + """Add a new service to the local agent. + + :param name: name of the service. + :param service_id: service id, optional, if not provided *name* is used. + :param address: will default to the address of the agent if not provided. + :param port: port on which the service is available. + :param tagged_addresses: additional addresses for a node or service. + :tags: a list of string values that add service-level labels. + :enable_tag_override: optional ``bool`` that enable you to modify a service tags from servers + (consul agent role server). Default is set to ``False``. + :check: an optional health check for this service. + :token: an optional ACL token to apply to this request. + + :returns: ``True`` if the service was successfully registered/updated, otherwise ``False``. + """ + payload: Dict[str, Any] = {'name': name} + + if enable_tag_override: + payload['enabletagoverride'] = enable_tag_override + if service_id: + payload['id'] = service_id + if address: + payload['address'] = address + if port: + payload['port'] = port + if tagged_addresses: + payload['tagged_addresses'] = tagged_addresses + if tags: + payload['tags'] = tags + if check: + payload['check'] = check + + token = token or self.agent.token + params = {'token': token} if token else {} + + return self.agent.http.put(base.CB.bool(), '/v1/agent/service/register', + params=params, data=json.dumps(payload)) + + class Response(NamedTuple): code: int headers: Union[Mapping[str, str], Mapping[bytes, bytes], None] @@ -269,6 +320,7 @@ class Consul(AbstractDCS): kwargs['verify'] = verify self._client = ConsulClient(**kwargs) + self._agent_service = ConsulAgentService(self._client) self.set_retry_timeout(config['retry_timeout']) self.set_ttl(config.get('ttl') or 30) self._last_session_refresh = 0 @@ -503,14 +555,14 @@ class Consul(AbstractDCS): @catch_consul_errors def register_service(self, service_name: str, **kwargs: Any) -> bool: logger.info('Register service %s, params %s', service_name, kwargs) - return self._client.agent.service.register(service_name, **kwargs) + return self._agent_service.register(service_name, **kwargs) @catch_consul_errors def deregister_service(self, service_id: str) -> bool: logger.info('Deregister service %s', service_id) # service_id can contain special characters, but is used as part of uri in deregister request service_id = quote(service_id) - return self._client.agent.service.deregister(service_id) + return self._agent_service.deregister(service_id) def _update_service(self, data: Dict[str, Any]) -> Optional[bool]: service_name = self._service_name diff --git a/tests/test_consul.py b/tests/test_consul.py index 494d1126..fea5836a 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -4,7 +4,7 @@ import unittest from consul import ConsulException, NotFound from mock import Mock, PropertyMock, patch from patroni.dcs import get_dcs -from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulInternalError, \ +from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulAgentService, ConsulInternalError, \ ConsulError, ConsulClient, HTTPClient, InvalidSessionTTL, InvalidSession, RetryFailedError from patroni.postgresql.mpp import get_mpp from . import SleepException @@ -245,8 +245,8 @@ class TestConsul(unittest.TestCase): def test_set_history_value(self): self.assertTrue(self.c.set_history_value('{}')) - @patch.object(consul.Consul.Agent.Service, 'register', Mock(side_effect=(False, True, True, True))) - @patch.object(consul.Consul.Agent.Service, 'deregister', Mock(return_value=True)) + @patch.object(ConsulAgentService, 'register', Mock(side_effect=(False, True, True, True))) + @patch.object(ConsulAgentService, 'deregister', Mock(return_value=True)) def test_update_service(self): d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'} self.assertIsNone(self.c.update_service({}, {})) @@ -277,7 +277,7 @@ class TestConsul(unittest.TestCase): # Changing register_service from True to False calls deregister() self.c.reload_config({'consul': {'register_service': False}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10}) - with patch('consul.Consul.Agent.Service.deregister') as mock_deregister: + with patch.object(ConsulAgentService, 'deregister') as mock_deregister: self.c.touch_member(d) mock_deregister.assert_called_once() @@ -285,31 +285,31 @@ class TestConsul(unittest.TestCase): # register_service staying False between reloads does not call deregister() self.c.reload_config({'consul': {'register_service': False}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10}) - with patch('consul.Consul.Agent.Service.deregister') as mock_deregister: + with patch.object(ConsulAgentService, 'deregister') as mock_deregister: self.c.touch_member(d) self.assertFalse(mock_deregister.called) # Changing register_service from False to True calls register() self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10}) - with patch('consul.Consul.Agent.Service.register') as mock_register: + with patch.object(HTTPClient, 'put', create=True) as mock_put: self.c.touch_member(d) - mock_register.assert_called_once() + mock_put.assert_called_once() # register_service staying True between reloads does not call register() self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10}) - with patch('consul.Consul.Agent.Service.register') as mock_register: + with patch.object(ConsulAgentService, 'register') as mock_register: self.c.touch_member(d) self.assertFalse(mock_deregister.called) # register_service staying True between reloads does calls register() if other service data has changed self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10}) - with patch('consul.Consul.Agent.Service.register') as mock_register: + with patch.object(ConsulAgentService, 'register') as mock_register: self.c.touch_member(d) mock_register.assert_called_once() # register_service staying True between reloads does calls register() if service_tags have changed self.c.reload_config({'consul': {'register_service': True, 'service_tags': ['foo']}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10}) - with patch('consul.Consul.Agent.Service.register') as mock_register: + with patch.object(ConsulAgentService, 'register') as mock_register: self.c.touch_member(d) mock_register.assert_called_once() diff --git a/typings/consul/base.pyi b/typings/consul/base.pyi index d512d108..65dd561b 100644 --- a/typings/consul/base.pyi +++ b/typings/consul/base.pyi @@ -1,10 +1,14 @@ -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple class ConsulException(Exception): ... class NotFound(ConsulException): ... +class CB: + @classmethod + def bool(klass) -> Callable[[NamedTuple], bool]: ... class Check: @classmethod def http(klass, url: str, interval: str, timeout: Optional[str] = None, deregister: Optional[str] = None) -> Dict[str, str]: ... class Consul: + token: Optional[str] http: Any agent: 'Consul.Agent' session: 'Consul.Session' @@ -17,7 +21,9 @@ class Consul: service: 'Consul.Agent.Service' def self(self) -> Dict[str, Dict[str, Any]]: ... class Service: - def register(self, name: str, service_id=..., address=..., port=..., tags=..., check=..., token=..., script=..., interval=..., ttl=..., http=..., timeout=..., enable_tag_override=...) -> bool: ... + agent: 'Consul' + def __init__(self, agent: 'Consul') -> None: .. + def register(self, name: str, service_id: Optional[str] = None, address: Optional[str] = None, port: Optional[int] = None, tags: Optional[List[str]] = None, check: Optional[Dict[str, str]] = None, token: Optional[str] = None, enable_tag_override: bool = False) -> bool: ... def deregister(self, service_id: str) -> bool: ... class Session: def create(self, name: Optional[str] = None, node: Optional[str] = [], checks: Optional[List[str]]=None, lock_delay: float = 15, behavior: str = 'release', ttl: Optional[int] = None, dc: Optional[str] = None) -> str: ...