mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
Etcd smart refresh members (#1499)
In dynamic environments it is common that during the rolling upgrade etcd nodes are changing their IP addresses. If the etcd node where Patroni is currently connected to is upgraded last, it could happen that the cached topology doesn't contain any live node anymore and therefore request can't be retried and totally fails, usually resulting in demoting of the primary. In order to partially overcome the problem, Patroni is already doing a periodic (every 5 minutes) rediscovery of the etcd cluster topology, but in case of very fast node rotation there was still a possibility to hit the issue. This PR is an attempt to address the problem. If the list of nodes exhausted, Patroni will try to perform initial discovery via an external mechanism, like resolving A or SRV dns records and if the new list is different from the original, Patroni will use it as the new etcd cluster topology. In order to deal with tcp issues the connect_timeout is set to max(read_timeout/2, 1). It will make list of members exhaust faster, but leaves the time to perform topology rediscovery and another attempt. The third issue addressed by this PR - it could happen that dns names of etcd nodes didn't change, but ip addresses are new, therefore we clean up the internal dns cache when doing topology rediscovery. Besides that, this commit makes `_machines_cache` property pretty much static, it will be updated only when the topology has changed and helps to avoid concurrency issues.
This commit is contained in:
committed by
GitHub
parent
80fbe90056
commit
be4c078d95
@@ -11,6 +11,7 @@ import time
|
||||
|
||||
from dns.exception import DNSException
|
||||
from dns import resolver
|
||||
from urllib3 import Timeout
|
||||
from urllib3.exceptions import HTTPError, ReadTimeoutError, ProtocolError
|
||||
from six.moves.queue import Queue
|
||||
from six.moves.http_client import HTTPException
|
||||
@@ -69,6 +70,9 @@ class DnsCachingResolver(Thread):
|
||||
def resolve_async(self, host, port, attempt=0):
|
||||
self._resolve_queue.put(((host, port), attempt))
|
||||
|
||||
def remove(self, host, port):
|
||||
self._cache.pop((host, port), None)
|
||||
|
||||
@staticmethod
|
||||
def _do_resolve(host, port):
|
||||
try:
|
||||
@@ -91,6 +95,7 @@ class Client(etcd.Client):
|
||||
# Workaround for the case when https://github.com/jplana/python-etcd/pull/196 is not applied
|
||||
self.http.connection_pool_kw.pop('ssl_version', None)
|
||||
self._config = config
|
||||
self._initial_machines_cache = []
|
||||
self._load_machines_cache()
|
||||
self._allow_reconnect = True
|
||||
# allow passing retry argument to api_execute in params
|
||||
@@ -98,14 +103,13 @@ class Client(etcd.Client):
|
||||
self._read_options.add('retry')
|
||||
self._del_conditions.add('retry')
|
||||
|
||||
def _calculate_timeouts(self, etcd_nodes=None, timeout=None):
|
||||
def _calculate_timeouts(self, etcd_nodes, timeout=None):
|
||||
"""Calculate a request timeout and number of retries per single etcd node.
|
||||
In case if the timeout per node is too small (less than one second) we will reduce the number of nodes.
|
||||
For the cluster with only one node we will try to do 2 retries.
|
||||
For clusters with 2 nodes we will try to do 1 retry for every node.
|
||||
No retries for clusters with 3 or more nodes. We better rely on switching to a different node."""
|
||||
|
||||
etcd_nodes = etcd_nodes or len(self._machines_cache) + 1
|
||||
per_node_timeout = timeout = float(timeout or self.read_timeout)
|
||||
|
||||
max_retries = 4 - min(etcd_nodes, 3)
|
||||
@@ -130,19 +134,25 @@ class Client(etcd.Client):
|
||||
basic_auth = ':'.join((self.username, self.password)) if self.username and self.password else None
|
||||
return urllib3.make_headers(basic_auth=basic_auth, user_agent=USER_AGENT)
|
||||
|
||||
def _build_request_parameters(self, timeout=None):
|
||||
def _build_request_parameters(self, etcd_nodes, timeout=None):
|
||||
kwargs = {'headers': self._get_headers(), 'redirect': self.allow_redirect}
|
||||
|
||||
if timeout is not None:
|
||||
kwargs.update(retries=0, timeout=timeout)
|
||||
else:
|
||||
_, per_node_timeout, per_node_retries = self._calculate_timeouts()
|
||||
kwargs.update(timeout=per_node_timeout, retries=per_node_retries)
|
||||
_, per_node_timeout, per_node_retries = self._calculate_timeouts(etcd_nodes)
|
||||
connect_timeout = max(1, per_node_timeout/2)
|
||||
kwargs.update(timeout=Timeout(connect=connect_timeout, total=per_node_timeout), retries=per_node_retries)
|
||||
return kwargs
|
||||
|
||||
def set_machines_cache_ttl(self, cache_ttl):
|
||||
self._machines_cache_ttl = cache_ttl
|
||||
|
||||
@property
|
||||
def machines_cache(self):
|
||||
base_uri, cache = self._base_uri, self._machines_cache
|
||||
return ([base_uri] if base_uri in cache else []) + [machine for machine in cache if machine != base_uri]
|
||||
|
||||
@property
|
||||
def machines(self):
|
||||
"""Original `machines` method(property) of `etcd.Client` class raise exception
|
||||
@@ -155,58 +165,58 @@ class Client(etcd.Client):
|
||||
Also this method implements the same timeout-retry logic as `api_execute`, because
|
||||
the original method was retrying 2 times with the `read_timeout` on each node."""
|
||||
|
||||
kwargs = self._build_request_parameters()
|
||||
machines_cache = self.machines_cache
|
||||
kwargs = self._build_request_parameters(len(machines_cache))
|
||||
|
||||
while True:
|
||||
for base_uri in machines_cache:
|
||||
try:
|
||||
response = self.http.request(self._MGET, self._base_uri + self.version_prefix + '/machines', **kwargs)
|
||||
response = self.http.request(self._MGET, base_uri + self.version_prefix + '/machines', **kwargs)
|
||||
data = self._handle_server_response(response).data.decode('utf-8')
|
||||
machines = [m.strip() for m in data.split(',') if m.strip()]
|
||||
logger.debug("Retrieved list of machines: %s", machines)
|
||||
if not machines:
|
||||
raise etcd.EtcdException
|
||||
random.shuffle(machines)
|
||||
for url in machines:
|
||||
r = urlparse(url)
|
||||
port = r.port or (443 if r.scheme == 'https' else 80)
|
||||
self._dns_resolver.resolve_async(r.hostname, port)
|
||||
return machines
|
||||
if machines:
|
||||
random.shuffle(machines)
|
||||
self._update_dns_cache(self._dns_resolver.resolve_async, machines)
|
||||
return machines
|
||||
except Exception as e:
|
||||
# We can't get the list of machines, if one server is in the
|
||||
# machines cache, try on it
|
||||
logger.error("Failed to get list of machines from %s%s: %r", self._base_uri, self.version_prefix, e)
|
||||
if self._machines_cache:
|
||||
self._base_uri = self._machines_cache.pop(0)
|
||||
logger.info("Retrying on %s", self._base_uri)
|
||||
elif self._update_machines_cache:
|
||||
raise etcd.EtcdException("Could not get the list of servers, "
|
||||
"maybe you provided the wrong "
|
||||
"host(s) to connect to?")
|
||||
else:
|
||||
return []
|
||||
self.http.clear()
|
||||
logger.error("Failed to get list of machines from %s%s: %r", base_uri, self.version_prefix, e)
|
||||
|
||||
raise etcd.EtcdConnectionFailed('No more machines in the cluster')
|
||||
|
||||
def set_read_timeout(self, timeout):
|
||||
self._read_timeout = timeout
|
||||
|
||||
def _do_http_request(self, request_executor, method, url, fields=None, **kwargs):
|
||||
try:
|
||||
response = request_executor(method, url, fields=fields, **kwargs)
|
||||
response.data.decode('utf-8')
|
||||
self._check_cluster_id(response)
|
||||
except (HTTPError, HTTPException, socket.error, socket.timeout) as e:
|
||||
if (isinstance(fields, dict) and fields.get("wait") == "true" and
|
||||
isinstance(e, (ReadTimeoutError, ProtocolError))):
|
||||
logger.debug("Watch timed out.")
|
||||
def _do_http_request(self, retry, machines_cache, request_executor, method, path, fields=None, **kwargs):
|
||||
some_request_failed = False
|
||||
for i, base_uri in enumerate(machines_cache):
|
||||
if i > 0:
|
||||
logger.info("Retrying on %s", base_uri)
|
||||
try:
|
||||
response = request_executor(method, base_uri + path, fields=fields, **kwargs)
|
||||
response.data.decode('utf-8')
|
||||
self._check_cluster_id(response)
|
||||
if some_request_failed:
|
||||
self.set_base_uri(base_uri)
|
||||
self._refresh_machines_cache()
|
||||
return response
|
||||
except (HTTPError, HTTPException, socket.error, socket.timeout) as e:
|
||||
self.http.clear()
|
||||
# switch to the next etcd node because we don't know exactly what happened,
|
||||
# whether the key didn't received an update or there is a network problem.
|
||||
self._machines_cache.insert(0, self._base_uri)
|
||||
self._base_uri = self._next_server()
|
||||
raise etcd.EtcdWatchTimedOut("Watch timed out: {0}".format(e), cause=e)
|
||||
logger.error("Request to server %s failed: %r", self._base_uri, e)
|
||||
logger.info("Reconnection allowed, looking for another server.")
|
||||
self._base_uri = self._next_server(cause=e)
|
||||
response = False
|
||||
return response
|
||||
if not retry and i + 1 < len(machines_cache):
|
||||
self.set_base_uri(machines_cache[i + 1])
|
||||
if (isinstance(fields, dict) and fields.get("wait") == "true" and
|
||||
isinstance(e, (ReadTimeoutError, ProtocolError))):
|
||||
logger.debug("Watch timed out.")
|
||||
raise etcd.EtcdWatchTimedOut("Watch timed out: {0}".format(e), cause=e)
|
||||
logger.error("Request to server %s failed: %r", base_uri, e)
|
||||
logger.info("Reconnection allowed, looking for another server.")
|
||||
if not retry:
|
||||
raise etcd.EtcdException('{0} {1} request failed'.format(method, path))
|
||||
some_request_failed = True
|
||||
|
||||
raise etcd.EtcdConnectionFailed('No more machines in the cluster')
|
||||
|
||||
def api_execute(self, path, method, params=None, timeout=None):
|
||||
if not path.startswith('/'):
|
||||
@@ -229,44 +239,34 @@ class Client(etcd.Client):
|
||||
elif not self._use_proxies and time.time() - self._machines_cache_updated > self._machines_cache_ttl:
|
||||
self._refresh_machines_cache()
|
||||
|
||||
kwargs.update(self._build_request_parameters(timeout))
|
||||
|
||||
if retry:
|
||||
machines_cache = [self._base_uri] + self._machines_cache
|
||||
|
||||
response = False
|
||||
machines_cache = self.machines_cache
|
||||
etcd_nodes = len(machines_cache)
|
||||
kwargs.update(self._build_request_parameters(etcd_nodes, timeout))
|
||||
|
||||
while True:
|
||||
try:
|
||||
some_request_failed = False
|
||||
while not response:
|
||||
response = self._do_http_request(request_executor, method, self._base_uri + path, **kwargs)
|
||||
|
||||
if response is False:
|
||||
if not retry:
|
||||
raise etcd.EtcdException('{0} {1} request failed'.format(method, path))
|
||||
some_request_failed = True
|
||||
if some_request_failed:
|
||||
self._refresh_machines_cache()
|
||||
if response:
|
||||
break
|
||||
except etcd.EtcdConnectionFailed:
|
||||
if not retry:
|
||||
raise
|
||||
response = self._do_http_request(retry, machines_cache, request_executor, method, path, **kwargs)
|
||||
return self._handle_server_response(response)
|
||||
except etcd.EtcdWatchTimedOut:
|
||||
raise
|
||||
except etcd.EtcdConnectionFailed as ex:
|
||||
try:
|
||||
if self._load_machines_cache():
|
||||
machines_cache = self.machines_cache
|
||||
etcd_nodes = len(machines_cache)
|
||||
except Exception as e:
|
||||
logger.debug('Failed to update list of etcd nodes: %r', e)
|
||||
sleeptime = retry.sleeptime
|
||||
remaining_time = retry.stoptime - sleeptime - time.time()
|
||||
nodes, timeout, retries = self._calculate_timeouts(len(machines_cache), remaining_time)
|
||||
nodes, timeout, retries = self._calculate_timeouts(etcd_nodes, remaining_time)
|
||||
if nodes == 0:
|
||||
self._update_machines_cache = True
|
||||
raise
|
||||
raise ex
|
||||
retry.sleep_func(sleeptime)
|
||||
retry.update_delay()
|
||||
# We still have some time left. Partially restore `_machines_cache` and retry request
|
||||
kwargs.update(timeout=timeout, retries=retries)
|
||||
self._base_uri = machines_cache[0]
|
||||
self._machines_cache = machines_cache[1:nodes]
|
||||
|
||||
return self._handle_server_response(response)
|
||||
# We still have some time left. Partially reduce `machines_cache` and retry request
|
||||
kwargs.update(timeout=Timeout(connect=max(1, timeout/2), total=timeout), retries=retries)
|
||||
machines_cache = machines_cache[:nodes]
|
||||
|
||||
@staticmethod
|
||||
def get_srv_record(host):
|
||||
@@ -327,6 +327,13 @@ class Client(etcd.Client):
|
||||
machines_cache = self._get_machines_cache_from_dns(self._config['host'], self._config['port'])
|
||||
return machines_cache
|
||||
|
||||
@staticmethod
|
||||
def _update_dns_cache(func, machines):
|
||||
for url in machines:
|
||||
r = urlparse(url)
|
||||
port = r.port or (443 if r.scheme == 'https' else 80)
|
||||
func(r.hostname, port)
|
||||
|
||||
def _load_machines_cache(self):
|
||||
"""This method should fill up `_machines_cache` from scratch.
|
||||
It could happen only in two cases:
|
||||
@@ -338,25 +345,49 @@ class Client(etcd.Client):
|
||||
if 'srv' not in self._config and 'host' not in self._config and 'hosts' not in self._config:
|
||||
raise Exception('Neither srv, hosts, host nor url are defined in etcd section of config')
|
||||
|
||||
self._machines_cache = self._get_machines_cache_from_config()
|
||||
|
||||
machines_cache = self._get_machines_cache_from_config()
|
||||
# Can not bootstrap list of etcd-cluster members, giving up
|
||||
if not self._machines_cache:
|
||||
if not machines_cache:
|
||||
raise etcd.EtcdException
|
||||
|
||||
# After filling up initial list of machines_cache we should ask etcd-cluster about actual list
|
||||
self._base_uri = self._next_server()
|
||||
self._refresh_machines_cache()
|
||||
self._update_machines_cache = False
|
||||
# enforce resolving dns name,they might get new ips
|
||||
self._update_dns_cache(self._dns_resolver.remove, machines_cache)
|
||||
|
||||
def _refresh_machines_cache(self):
|
||||
self._machines_cache = self._get_machines_cache_from_config() if self._use_proxies else self.machines
|
||||
if self._base_uri in self._machines_cache:
|
||||
self._machines_cache.remove(self._base_uri)
|
||||
elif self._machines_cache:
|
||||
self._base_uri = self._next_server()
|
||||
# The etcd cluster could change its topology over time and depending on how we resolve the initial
|
||||
# topology (list of hosts in the Patroni config or DNS records, A or SRV) we might get into the situation
|
||||
# the the real topology doesn't match anymore with the topology resolved from the configuration file.
|
||||
# In case if the "initial" topology is the same as before we will not override the `_machines_cache`.
|
||||
ret = set(machines_cache) != set(self._initial_machines_cache)
|
||||
if ret:
|
||||
self._initial_machines_cache = self._machines_cache = machines_cache
|
||||
|
||||
# After filling up the initial list of machines_cache we should ask etcd-cluster about actual list
|
||||
self._refresh_machines_cache(True)
|
||||
|
||||
self._update_machines_cache = False
|
||||
return ret
|
||||
|
||||
def _refresh_machines_cache(self, updating_cache=False):
|
||||
if self._use_proxies:
|
||||
self._machines_cache = self._get_machines_cache_from_config()
|
||||
else:
|
||||
try:
|
||||
self._machines_cache = self.machines
|
||||
except etcd.EtcdConnectionFailed:
|
||||
if updating_cache:
|
||||
raise etcd.EtcdException("Could not get the list of servers, "
|
||||
"maybe you provided the wrong "
|
||||
"host(s) to connect to?")
|
||||
return
|
||||
|
||||
if self._base_uri not in self._machines_cache:
|
||||
self.set_base_uri(self._machines_cache[0])
|
||||
self._machines_cache_updated = time.time()
|
||||
|
||||
def set_base_uri(self, value):
|
||||
logger.info('Selected new etcd server %s', value)
|
||||
self._base_uri = value
|
||||
|
||||
|
||||
class Etcd(AbstractDCS):
|
||||
|
||||
@@ -633,7 +664,6 @@ class Etcd(AbstractDCS):
|
||||
# than reestablishing http connection every time from every replica.
|
||||
return True
|
||||
except etcd.EtcdWatchTimedOut:
|
||||
self._client.http.clear()
|
||||
self._has_failed = False
|
||||
return False
|
||||
except (etcd.EtcdEventIndexCleared, etcd.EtcdWatcherCleared): # Watch failed
|
||||
|
||||
@@ -130,12 +130,11 @@ class TestClient(unittest.TestCase):
|
||||
self.client.http.request_encode_body = http_request
|
||||
|
||||
def test_machines(self):
|
||||
self.client._base_uri = 'http://localhost:4001'
|
||||
self.client._machines_cache = ['http://localhost:2379']
|
||||
self.client._base_uri = 'http://localhost:4002'
|
||||
self.client._machines_cache = ['http://localhost:4002', 'http://localhost:2379']
|
||||
self.assertIsNotNone(self.client.machines)
|
||||
self.client._base_uri = 'http://localhost:4001'
|
||||
self.client._machines_cache = []
|
||||
self.assertIsNotNone(self.client.machines)
|
||||
self.client._machines_cache = ['http://localhost:4001']
|
||||
self.client._update_machines_cache = True
|
||||
machines = None
|
||||
try:
|
||||
@@ -146,16 +145,13 @@ class TestClient(unittest.TestCase):
|
||||
|
||||
@patch.object(Client, 'machines')
|
||||
def test_api_execute(self, mock_machines):
|
||||
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
|
||||
mock_machines.__get__ = Mock(return_value=['http://localhost:4001', 'http://localhost:2379'])
|
||||
self.assertRaises(ValueError, self.client.api_execute, '', '')
|
||||
self.client._base_uri = 'http://localhost:4001'
|
||||
self.client._machines_cache = ['http://localhost:2379']
|
||||
self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', 'POST', timeout=0)
|
||||
self.client._base_uri = 'http://localhost:4001'
|
||||
self.client._machines_cache = ['http://localhost:2379']
|
||||
rtry = Retry(deadline=10, max_delay=1, max_tries=-1, retry_exceptions=(etcd.EtcdLeaderElectionInProgress,))
|
||||
rtry(self.client.api_execute, '/', 'POST', timeout=0, params={'retry': rtry})
|
||||
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
|
||||
self.client._machines_cache_updated = 0
|
||||
self.client.api_execute('/', 'POST', timeout=0)
|
||||
self.client._machines_cache = [self.client._base_uri]
|
||||
@@ -163,10 +159,17 @@ class TestClient(unittest.TestCase):
|
||||
self.assertRaises(etcd.EtcdWatchTimedOut, self.client.api_execute, '/timeout', 'POST', params={'wait': 'true'})
|
||||
self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', '')
|
||||
|
||||
with patch.object(Client, '_do_http_request', Mock(side_effect=etcd.EtcdConnectionFailed)):
|
||||
with patch.object(Client, '_calculate_timeouts', Mock(side_effect=[(1, 1, 0), (1, 1, 0), (0, 1, 0)])):
|
||||
self.assertRaises(etcd.EtcdException, rtry, self.client.api_execute, '/', 'GET', params={'retry': rtry})
|
||||
self.client._read_timeout = 0
|
||||
with patch.object(Client, '_calculate_timeouts', Mock(side_effect=[(1, 1, 0), (1, 1, 0), (0, 1, 0)])),\
|
||||
patch.object(Client, '_load_machines_cache', Mock(side_effect=Exception)):
|
||||
self.client.http.request = Mock(side_effect=socket.error)
|
||||
self.assertRaises(etcd.EtcdException, rtry, self.client.api_execute, '/', 'GET', params={'retry': rtry})
|
||||
|
||||
with patch.object(Client, '_calculate_timeouts', Mock(side_effect=[(1, 1, 0), (1, 1, 0), (0, 1, 0)])),\
|
||||
patch.object(Client, '_load_machines_cache', Mock(return_value=True)):
|
||||
self.assertRaises(etcd.EtcdException, rtry, self.client.api_execute, '/', 'GET', params={'retry': rtry})
|
||||
|
||||
with patch.object(Client, '_do_http_request', Mock(side_effect=etcd.EtcdException)):
|
||||
self.client._read_timeout = 0.01
|
||||
self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', 'GET')
|
||||
|
||||
def test_get_srv_record(self):
|
||||
@@ -182,8 +185,12 @@ class TestClient(unittest.TestCase):
|
||||
self.client._get_machines_cache_from_dns('error', 2379)
|
||||
|
||||
@patch.object(Client, 'machines')
|
||||
def test__load_machines_cache(self, mock_machines):
|
||||
mock_machines.__get__ = Mock(return_value=['http://localhost:2379'])
|
||||
def test__refresh_machines_cache(self, mock_machines):
|
||||
mock_machines.__get__ = Mock(side_effect=etcd.EtcdConnectionFailed)
|
||||
self.assertIsNone(self.client._refresh_machines_cache())
|
||||
self.assertRaises(etcd.EtcdException, self.client._refresh_machines_cache, True)
|
||||
|
||||
def test__load_machines_cache(self):
|
||||
self.client._config = {}
|
||||
self.assertRaises(Exception, self.client._load_machines_cache)
|
||||
self.client._config = {'srv': 'blabla'}
|
||||
|
||||
Reference in New Issue
Block a user