diff --git a/patroni/dcs/consul.py b/patroni/dcs/consul.py index 6bcd3216..f65eeb2a 100644 --- a/patroni/dcs/consul.py +++ b/patroni/dcs/consul.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import json import logging import os +import re import socket import ssl import time @@ -12,7 +13,7 @@ from patroni.dcs import AbstractDCS, ClusterConfig, Cluster, Failover, Leader, M from patroni.exceptions import DCSError from patroni.utils import deep_compare, parse_bool, Retry, RetryFailedError, split_host_port from urllib3.exceptions import HTTPError -from six.moves.urllib.parse import urlencode, urlparse +from six.moves.urllib.parse import urlencode, urlparse, quote from six.moves.http_client import HTTPException logger = logging.getLogger(__name__) @@ -133,6 +134,31 @@ def catch_consul_errors(func): return wrapper +def force_if_last_failed(func): + def wrapper(*args, **kwargs): + if wrapper.last_result is False: + kwargs['force'] = True + wrapper.last_result = func(*args, **kwargs) + return wrapper.last_result + + wrapper.last_result = None + return wrapper + + +def service_name_from_scope_name(scope_name): + """Translate scope name to service name which can be used in dns. + + 230 = 253 - len('replica.') - len('.service.consul') + """ + + def replace_char(match): + c = match.group(0) + return '-' if c in '. _' else "u{:04d}".format(ord(c)) + + service_name = re.sub(r'[^a-z0-9\-]', replace_char, scope_name.lower()) + return service_name[0:230] + + class Consul(AbstractDCS): def __init__(self, config): @@ -174,6 +200,13 @@ class Consul(AbstractDCS): self.set_ttl(config.get('ttl') or 30) self._last_session_refresh = 0 self.__session_checks = config.get('checks') + self._register_service = config.get('register_service', False) + if self._register_service: + self._service_name = service_name_from_scope_name(self._scope) + if self._scope != self._service_name: + logger.warning('Using %s as consul service name instead of scope name %s', self._service_name, + self._scope) + self._service_check_interval = config.get('service_check_interval', '5s') if not self._ctl: self.create_session() @@ -323,11 +356,65 @@ class Consul(AbstractDCS): try: args = {} if permanent else {'acquire': self._session} self._client.kv.put(self.member_path, json.dumps(data, separators=(',', ':')), **args) + if self._register_service: + self.update_service(not create_member and member and member.data or {}, data) return True except Exception: logger.exception('touch_member') return False + @catch_consul_errors + def register_service(self, service_name, **kwargs): + logger.info('Register service %s, params %s', service_name, kwargs) + return self._client.agent.service.register(service_name, **kwargs) + + @catch_consul_errors + def deregister_service(self, service_id): + logger.info('Deregister service %s', service_id) + # service_id can contain special characters, but is used as part of uri in deregister request + service_id = quote(service_id) + return self._client.agent.service.deregister(service_id) + + def _update_service(self, data): + service_name = self._service_name + role = data['role'] + state = data['state'] + api_parts = urlparse(data['api_url']) + api_parts = api_parts._replace(path='/{0}'.format(role)) + conn_parts = urlparse(data['conn_url']) + check = base.Check.http(api_parts.geturl(), self._service_check_interval, deregister=self._client.http.ttl * 10) + params = { + 'service_id': '{0}/{1}'.format(self._scope, self._name), + 'address': conn_parts.hostname, + 'port': conn_parts.port, + 'check': check, + 'tags': [role] + } + + if state == 'stopped': + return self.deregister_service(params['service_id']) + + if role in ['master', 'replica']: + if state != 'running': + return + return self.register_service(service_name, **params) + + logger.warning('Could not register service: unknown role type %s', role) + + @force_if_last_failed + def update_service(self, old_data, new_data, force=False): + update = False + + for key in ['role', 'api_url', 'conn_url', 'state']: + if key not in new_data: + logger.warning('Could not register service: not enough params in member data') + return + if old_data.get(key) != new_data[key]: + update = True + + if force or update: + return self._update_service(new_data) + @catch_consul_errors def _do_attempt_to_acquire_leader(self, kwargs): return self.retry(self._client.kv.put, self.leader_path, self._name, **kwargs) @@ -339,6 +426,7 @@ class Consul(AbstractDCS): ret = self._do_attempt_to_acquire_leader({} if permanent else {'acquire': self._session}) if not ret: logger.info('Could not take out TTL lock') + return ret def take_leader(self): diff --git a/tests/test_consul.py b/tests/test_consul.py index c26ca3c6..7f752f18 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -74,10 +74,12 @@ class TestConsul(unittest.TestCase): @patch.object(consul.Consul.KV, 'delete', Mock()) def setUp(self): Consul({'ttl': 30, 'scope': 't', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10, - 'verify': 'on', 'key': 'foo', 'cert': 'bar', 'cacert': 'buz', 'token': 'asd', 'dc': 'dc1'}) - Consul({'ttl': 30, 'scope': 't', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10, - 'verify': 'on', 'cert': 'bar', 'cacert': 'buz'}) - self.c = Consul({'ttl': 30, 'scope': 'test', 'name': 'postgresql1', 'host': 'localhost:1', 'retry_timeout': 10}) + 'verify': 'on', 'key': 'foo', 'cert': 'bar', 'cacert': 'buz', 'token': 'asd', 'dc': 'dc1', + 'register_service': True}) + Consul({'ttl': 30, 'scope': 't_', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10, + 'verify': 'on', 'cert': 'bar', 'cacert': 'buz', 'register_service': True}) + self.c = Consul({'ttl': 30, 'scope': 'test', 'name': 'postgresql1', 'host': 'localhost:1', 'retry_timeout': 10, + 'register_service': True}) self.c._base_path = '/service/good' self.c._load_cluster() @@ -111,6 +113,7 @@ class TestConsul(unittest.TestCase): @patch.object(consul.Consul.KV, 'delete', Mock(side_effect=[ConsulException, True, True])) @patch.object(consul.Consul.KV, 'put', Mock(side_effect=[True, ConsulException])) def test_touch_member(self): + self.c._register_service = True self.c.refresh_session = Mock(return_value=True) self.c.touch_member({'balbla': 'blabla'}) self.c.touch_member({'balbla': 'blabla'}) @@ -177,3 +180,19 @@ class TestConsul(unittest.TestCase): @patch.object(consul.Consul.KV, 'put', Mock(return_value=True)) def test_set_history_value(self): self.assertTrue(self.c.set_history_value('{}')) + + @patch.object(consul.Consul.Agent.Service, 'register', Mock(side_effect=(False, True))) + @patch.object(consul.Consul.Agent.Service, 'deregister', Mock(return_value=True)) + def test_update_service(self): + d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'} + self.assertIsNone(self.c.update_service({}, {})) + self.assertFalse(self.c.update_service({}, d)) + self.assertTrue(self.c.update_service(d, d)) + self.assertIsNone(self.c.update_service(d, d)) + d['state'] = 'stopped' + self.assertTrue(self.c.update_service(d, d, force=True)) + d['state'] = 'unknown' + self.assertIsNone(self.c.update_service({}, d)) + d['state'] = 'running' + d['role'] = 'bla' + self.assertIsNone(self.c.update_service({}, d))