mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Register service in consul (#802)
Кegister service 'scope_name' with tag 'master' or 'replica' example with scope 'pgsql-pgpi' ```[root@pgpi1 ~]# host -t SRV pgsql-pgpi.service.consul. 127.0.0.1 Using domain server: Name: 127.0.0.1 Address: 127.0.0.1#53 Aliases: pgsql-pgpi.service.consul has SRV record 1 1 5432 pgpi1.node.dc.consul. pgsql-pgpi.service.consul has SRV record 1 1 5432 pgpi2.node.dc.consul. [root@pgpi1 ~]# host -t SRV master.pgsql-pgpi.service.consul. 127.0.0.1 Using domain server: Name: 127.0.0.1 Address: 127.0.0.1#53 Aliases: master.pgsql-pgpi.service.consul has SRV record 1 1 5432 pgpi2.node.dc.consul. [root@pgpi1 ~]# host -t SRV replica.pgsql-pgpi.service.consul. 127.0.0.1 Using domain server: Name: 127.0.0.1 Address: 127.0.0.1#53 Aliases: replica.pgsql-pgpi.service.consul has SRV record 1 1 5432 pgpi1.node.dc.consul.``` Fixes: https://github.com/zalando/patroni/issues/771
This commit is contained in:
committed by
Alexander Kukushkin
parent
dd7c3c349f
commit
2e9cb412e4
@@ -2,6 +2,7 @@ from __future__ import absolute_import
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import ssl
|
||||
import time
|
||||
@@ -12,7 +13,7 @@ from patroni.dcs import AbstractDCS, ClusterConfig, Cluster, Failover, Leader, M
|
||||
from patroni.exceptions import DCSError
|
||||
from patroni.utils import deep_compare, parse_bool, Retry, RetryFailedError, split_host_port
|
||||
from urllib3.exceptions import HTTPError
|
||||
from six.moves.urllib.parse import urlencode, urlparse
|
||||
from six.moves.urllib.parse import urlencode, urlparse, quote
|
||||
from six.moves.http_client import HTTPException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -133,6 +134,31 @@ def catch_consul_errors(func):
|
||||
return wrapper
|
||||
|
||||
|
||||
def force_if_last_failed(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
if wrapper.last_result is False:
|
||||
kwargs['force'] = True
|
||||
wrapper.last_result = func(*args, **kwargs)
|
||||
return wrapper.last_result
|
||||
|
||||
wrapper.last_result = None
|
||||
return wrapper
|
||||
|
||||
|
||||
def service_name_from_scope_name(scope_name):
|
||||
"""Translate scope name to service name which can be used in dns.
|
||||
|
||||
230 = 253 - len('replica.') - len('.service.consul')
|
||||
"""
|
||||
|
||||
def replace_char(match):
|
||||
c = match.group(0)
|
||||
return '-' if c in '. _' else "u{:04d}".format(ord(c))
|
||||
|
||||
service_name = re.sub(r'[^a-z0-9\-]', replace_char, scope_name.lower())
|
||||
return service_name[0:230]
|
||||
|
||||
|
||||
class Consul(AbstractDCS):
|
||||
|
||||
def __init__(self, config):
|
||||
@@ -174,6 +200,13 @@ class Consul(AbstractDCS):
|
||||
self.set_ttl(config.get('ttl') or 30)
|
||||
self._last_session_refresh = 0
|
||||
self.__session_checks = config.get('checks')
|
||||
self._register_service = config.get('register_service', False)
|
||||
if self._register_service:
|
||||
self._service_name = service_name_from_scope_name(self._scope)
|
||||
if self._scope != self._service_name:
|
||||
logger.warning('Using %s as consul service name instead of scope name %s', self._service_name,
|
||||
self._scope)
|
||||
self._service_check_interval = config.get('service_check_interval', '5s')
|
||||
if not self._ctl:
|
||||
self.create_session()
|
||||
|
||||
@@ -323,11 +356,65 @@ class Consul(AbstractDCS):
|
||||
try:
|
||||
args = {} if permanent else {'acquire': self._session}
|
||||
self._client.kv.put(self.member_path, json.dumps(data, separators=(',', ':')), **args)
|
||||
if self._register_service:
|
||||
self.update_service(not create_member and member and member.data or {}, data)
|
||||
return True
|
||||
except Exception:
|
||||
logger.exception('touch_member')
|
||||
return False
|
||||
|
||||
@catch_consul_errors
|
||||
def register_service(self, service_name, **kwargs):
|
||||
logger.info('Register service %s, params %s', service_name, kwargs)
|
||||
return self._client.agent.service.register(service_name, **kwargs)
|
||||
|
||||
@catch_consul_errors
|
||||
def deregister_service(self, service_id):
|
||||
logger.info('Deregister service %s', service_id)
|
||||
# service_id can contain special characters, but is used as part of uri in deregister request
|
||||
service_id = quote(service_id)
|
||||
return self._client.agent.service.deregister(service_id)
|
||||
|
||||
def _update_service(self, data):
|
||||
service_name = self._service_name
|
||||
role = data['role']
|
||||
state = data['state']
|
||||
api_parts = urlparse(data['api_url'])
|
||||
api_parts = api_parts._replace(path='/{0}'.format(role))
|
||||
conn_parts = urlparse(data['conn_url'])
|
||||
check = base.Check.http(api_parts.geturl(), self._service_check_interval, deregister=self._client.http.ttl * 10)
|
||||
params = {
|
||||
'service_id': '{0}/{1}'.format(self._scope, self._name),
|
||||
'address': conn_parts.hostname,
|
||||
'port': conn_parts.port,
|
||||
'check': check,
|
||||
'tags': [role]
|
||||
}
|
||||
|
||||
if state == 'stopped':
|
||||
return self.deregister_service(params['service_id'])
|
||||
|
||||
if role in ['master', 'replica']:
|
||||
if state != 'running':
|
||||
return
|
||||
return self.register_service(service_name, **params)
|
||||
|
||||
logger.warning('Could not register service: unknown role type %s', role)
|
||||
|
||||
@force_if_last_failed
|
||||
def update_service(self, old_data, new_data, force=False):
|
||||
update = False
|
||||
|
||||
for key in ['role', 'api_url', 'conn_url', 'state']:
|
||||
if key not in new_data:
|
||||
logger.warning('Could not register service: not enough params in member data')
|
||||
return
|
||||
if old_data.get(key) != new_data[key]:
|
||||
update = True
|
||||
|
||||
if force or update:
|
||||
return self._update_service(new_data)
|
||||
|
||||
@catch_consul_errors
|
||||
def _do_attempt_to_acquire_leader(self, kwargs):
|
||||
return self.retry(self._client.kv.put, self.leader_path, self._name, **kwargs)
|
||||
@@ -339,6 +426,7 @@ class Consul(AbstractDCS):
|
||||
ret = self._do_attempt_to_acquire_leader({} if permanent else {'acquire': self._session})
|
||||
if not ret:
|
||||
logger.info('Could not take out TTL lock')
|
||||
|
||||
return ret
|
||||
|
||||
def take_leader(self):
|
||||
|
||||
@@ -74,10 +74,12 @@ class TestConsul(unittest.TestCase):
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock())
|
||||
def setUp(self):
|
||||
Consul({'ttl': 30, 'scope': 't', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10,
|
||||
'verify': 'on', 'key': 'foo', 'cert': 'bar', 'cacert': 'buz', 'token': 'asd', 'dc': 'dc1'})
|
||||
Consul({'ttl': 30, 'scope': 't', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10,
|
||||
'verify': 'on', 'cert': 'bar', 'cacert': 'buz'})
|
||||
self.c = Consul({'ttl': 30, 'scope': 'test', 'name': 'postgresql1', 'host': 'localhost:1', 'retry_timeout': 10})
|
||||
'verify': 'on', 'key': 'foo', 'cert': 'bar', 'cacert': 'buz', 'token': 'asd', 'dc': 'dc1',
|
||||
'register_service': True})
|
||||
Consul({'ttl': 30, 'scope': 't_', 'name': 'p', 'url': 'https://l:1', 'retry_timeout': 10,
|
||||
'verify': 'on', 'cert': 'bar', 'cacert': 'buz', 'register_service': True})
|
||||
self.c = Consul({'ttl': 30, 'scope': 'test', 'name': 'postgresql1', 'host': 'localhost:1', 'retry_timeout': 10,
|
||||
'register_service': True})
|
||||
self.c._base_path = '/service/good'
|
||||
self.c._load_cluster()
|
||||
|
||||
@@ -111,6 +113,7 @@ class TestConsul(unittest.TestCase):
|
||||
@patch.object(consul.Consul.KV, 'delete', Mock(side_effect=[ConsulException, True, True]))
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=[True, ConsulException]))
|
||||
def test_touch_member(self):
|
||||
self.c._register_service = True
|
||||
self.c.refresh_session = Mock(return_value=True)
|
||||
self.c.touch_member({'balbla': 'blabla'})
|
||||
self.c.touch_member({'balbla': 'blabla'})
|
||||
@@ -177,3 +180,19 @@ class TestConsul(unittest.TestCase):
|
||||
@patch.object(consul.Consul.KV, 'put', Mock(return_value=True))
|
||||
def test_set_history_value(self):
|
||||
self.assertTrue(self.c.set_history_value('{}'))
|
||||
|
||||
@patch.object(consul.Consul.Agent.Service, 'register', Mock(side_effect=(False, True)))
|
||||
@patch.object(consul.Consul.Agent.Service, 'deregister', Mock(return_value=True))
|
||||
def test_update_service(self):
|
||||
d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'}
|
||||
self.assertIsNone(self.c.update_service({}, {}))
|
||||
self.assertFalse(self.c.update_service({}, d))
|
||||
self.assertTrue(self.c.update_service(d, d))
|
||||
self.assertIsNone(self.c.update_service(d, d))
|
||||
d['state'] = 'stopped'
|
||||
self.assertTrue(self.c.update_service(d, d, force=True))
|
||||
d['state'] = 'unknown'
|
||||
self.assertIsNone(self.c.update_service({}, d))
|
||||
d['state'] = 'running'
|
||||
d['role'] = 'bla'
|
||||
self.assertIsNone(self.c.update_service({}, d))
|
||||
|
||||
Reference in New Issue
Block a user