Add supports of Consul Service tagged_addresses

Inherit from Consul.Agent.Service and override register() method.
This commit is contained in:
Alexander Kukushkin
2024-03-05 12:34:03 +01:00
parent e131065d74
commit 820cc404b9
3 changed files with 72 additions and 14 deletions

View File

@@ -42,6 +42,57 @@ class InvalidSession(ConsulException):
"""invalid session"""
class ConsulAgentService(base.Consul.Agent.Service):
"""
Consul.Agent.Session with support of ``tagged_addresses``.
We do it in the Patroni code because ``python-consul`` and
``python-consul2`` modules don't receive any updates for at least 3 years.
"""
def register(self, name: str, service_id: Optional[str] = None, address: Optional[str] = None,
port: Optional[int] = None, tags: Optional[List[str]] = None, check: Optional[Dict[str, str]] = None,
token: Optional[str] = None, enable_tag_override: bool = False,
tagged_addresses: Optional[Dict[str, Dict[str, Union[str, int]]]] = None, **kwargs: Any) -> bool:
"""Add a new service to the local agent.
:param name: name of the service.
:param service_id: service id, optional, if not provided *name* is used.
:param address: will default to the address of the agent if not provided.
:param port: port on which the service is available.
:param tagged_addresses: additional addresses for a node or service.
:tags: a list of string values that add service-level labels.
:enable_tag_override: optional ``bool`` that enable you to modify a service tags from servers
(consul agent role server). Default is set to ``False``.
:check: an optional health check for this service.
:token: an optional ACL token to apply to this request.
:returns: ``True`` if the service was successfully registered/updated, otherwise ``False``.
"""
payload: Dict[str, Any] = {'name': name}
if enable_tag_override:
payload['enabletagoverride'] = enable_tag_override
if service_id:
payload['id'] = service_id
if address:
payload['address'] = address
if port:
payload['port'] = port
if tagged_addresses:
payload['tagged_addresses'] = tagged_addresses
if tags:
payload['tags'] = tags
if check:
payload['check'] = check
token = token or self.agent.token
params = {'token': token} if token else {}
return self.agent.http.put(base.CB.bool(), '/v1/agent/service/register',
params=params, data=json.dumps(payload))
class Response(NamedTuple):
code: int
headers: Union[Mapping[str, str], Mapping[bytes, bytes], None]
@@ -269,6 +320,7 @@ class Consul(AbstractDCS):
kwargs['verify'] = verify
self._client = ConsulClient(**kwargs)
self._agent_service = ConsulAgentService(self._client)
self.set_retry_timeout(config['retry_timeout'])
self.set_ttl(config.get('ttl') or 30)
self._last_session_refresh = 0
@@ -503,14 +555,14 @@ class Consul(AbstractDCS):
@catch_consul_errors
def register_service(self, service_name: str, **kwargs: Any) -> bool:
logger.info('Register service %s, params %s', service_name, kwargs)
return self._client.agent.service.register(service_name, **kwargs)
return self._agent_service.register(service_name, **kwargs)
@catch_consul_errors
def deregister_service(self, service_id: str) -> bool:
logger.info('Deregister service %s', service_id)
# service_id can contain special characters, but is used as part of uri in deregister request
service_id = quote(service_id)
return self._client.agent.service.deregister(service_id)
return self._agent_service.deregister(service_id)
def _update_service(self, data: Dict[str, Any]) -> Optional[bool]:
service_name = self._service_name

View File

@@ -4,7 +4,7 @@ import unittest
from consul import ConsulException, NotFound
from mock import Mock, PropertyMock, patch
from patroni.dcs import get_dcs
from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulInternalError, \
from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulAgentService, ConsulInternalError, \
ConsulError, ConsulClient, HTTPClient, InvalidSessionTTL, InvalidSession, RetryFailedError
from patroni.postgresql.mpp import get_mpp
from . import SleepException
@@ -245,8 +245,8 @@ class TestConsul(unittest.TestCase):
def test_set_history_value(self):
self.assertTrue(self.c.set_history_value('{}'))
@patch.object(consul.Consul.Agent.Service, 'register', Mock(side_effect=(False, True, True, True)))
@patch.object(consul.Consul.Agent.Service, 'deregister', Mock(return_value=True))
@patch.object(ConsulAgentService, 'register', Mock(side_effect=(False, True, True, True)))
@patch.object(ConsulAgentService, 'deregister', Mock(return_value=True))
def test_update_service(self):
d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'}
self.assertIsNone(self.c.update_service({}, {}))
@@ -277,7 +277,7 @@ class TestConsul(unittest.TestCase):
# Changing register_service from True to False calls deregister()
self.c.reload_config({'consul': {'register_service': False}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
with patch('consul.Consul.Agent.Service.deregister') as mock_deregister:
with patch.object(ConsulAgentService, 'deregister') as mock_deregister:
self.c.touch_member(d)
mock_deregister.assert_called_once()
@@ -285,31 +285,31 @@ class TestConsul(unittest.TestCase):
# register_service staying False between reloads does not call deregister()
self.c.reload_config({'consul': {'register_service': False}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
with patch('consul.Consul.Agent.Service.deregister') as mock_deregister:
with patch.object(ConsulAgentService, 'deregister') as mock_deregister:
self.c.touch_member(d)
self.assertFalse(mock_deregister.called)
# Changing register_service from False to True calls register()
self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
with patch('consul.Consul.Agent.Service.register') as mock_register:
with patch.object(HTTPClient, 'put', create=True) as mock_put:
self.c.touch_member(d)
mock_register.assert_called_once()
mock_put.assert_called_once()
# register_service staying True between reloads does not call register()
self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
with patch('consul.Consul.Agent.Service.register') as mock_register:
with patch.object(ConsulAgentService, 'register') as mock_register:
self.c.touch_member(d)
self.assertFalse(mock_deregister.called)
# register_service staying True between reloads does calls register() if other service data has changed
self.c.reload_config({'consul': {'register_service': True}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
with patch('consul.Consul.Agent.Service.register') as mock_register:
with patch.object(ConsulAgentService, 'register') as mock_register:
self.c.touch_member(d)
mock_register.assert_called_once()
# register_service staying True between reloads does calls register() if service_tags have changed
self.c.reload_config({'consul': {'register_service': True, 'service_tags': ['foo']}, 'loop_wait': 10,
'ttl': 30, 'retry_timeout': 10})
with patch('consul.Consul.Agent.Service.register') as mock_register:
with patch.object(ConsulAgentService, 'register') as mock_register:
self.c.touch_member(d)
mock_register.assert_called_once()

View File

@@ -1,10 +1,14 @@
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple
class ConsulException(Exception): ...
class NotFound(ConsulException): ...
class CB:
@classmethod
def bool(klass) -> Callable[[NamedTuple], bool]: ...
class Check:
@classmethod
def http(klass, url: str, interval: str, timeout: Optional[str] = None, deregister: Optional[str] = None) -> Dict[str, str]: ...
class Consul:
token: Optional[str]
http: Any
agent: 'Consul.Agent'
session: 'Consul.Session'
@@ -17,7 +21,9 @@ class Consul:
service: 'Consul.Agent.Service'
def self(self) -> Dict[str, Dict[str, Any]]: ...
class Service:
def register(self, name: str, service_id=..., address=..., port=..., tags=..., check=..., token=..., script=..., interval=..., ttl=..., http=..., timeout=..., enable_tag_override=...) -> bool: ...
agent: 'Consul'
def __init__(self, agent: 'Consul') -> None: ..
def register(self, name: str, service_id: Optional[str] = None, address: Optional[str] = None, port: Optional[int] = None, tags: Optional[List[str]] = None, check: Optional[Dict[str, str]] = None, token: Optional[str] = None, enable_tag_override: bool = False) -> bool: ...
def deregister(self, service_id: str) -> bool: ...
class Session:
def create(self, name: Optional[str] = None, node: Optional[str] = [], checks: Optional[List[str]]=None, lock_delay: float = 15, behavior: str = 'release', ttl: Optional[int] = None, dc: Optional[str] = None) -> str: ...