diff --git a/patroni/dcs/etcd.py b/patroni/dcs/etcd.py index 02e88407..844948ea 100644 --- a/patroni/dcs/etcd.py +++ b/patroni/dcs/etcd.py @@ -11,6 +11,7 @@ import time from dns.exception import DNSException from dns import resolver +from urllib3 import Timeout from urllib3.exceptions import HTTPError, ReadTimeoutError, ProtocolError from six.moves.queue import Queue from six.moves.http_client import HTTPException @@ -69,6 +70,9 @@ class DnsCachingResolver(Thread): def resolve_async(self, host, port, attempt=0): self._resolve_queue.put(((host, port), attempt)) + def remove(self, host, port): + self._cache.pop((host, port), None) + @staticmethod def _do_resolve(host, port): try: @@ -91,6 +95,7 @@ class Client(etcd.Client): # Workaround for the case when https://github.com/jplana/python-etcd/pull/196 is not applied self.http.connection_pool_kw.pop('ssl_version', None) self._config = config + self._initial_machines_cache = [] self._load_machines_cache() self._allow_reconnect = True # allow passing retry argument to api_execute in params @@ -98,14 +103,13 @@ class Client(etcd.Client): self._read_options.add('retry') self._del_conditions.add('retry') - def _calculate_timeouts(self, etcd_nodes=None, timeout=None): + def _calculate_timeouts(self, etcd_nodes, timeout=None): """Calculate a request timeout and number of retries per single etcd node. In case if the timeout per node is too small (less than one second) we will reduce the number of nodes. For the cluster with only one node we will try to do 2 retries. For clusters with 2 nodes we will try to do 1 retry for every node. No retries for clusters with 3 or more nodes. We better rely on switching to a different node.""" - etcd_nodes = etcd_nodes or len(self._machines_cache) + 1 per_node_timeout = timeout = float(timeout or self.read_timeout) max_retries = 4 - min(etcd_nodes, 3) @@ -130,19 +134,25 @@ class Client(etcd.Client): basic_auth = ':'.join((self.username, self.password)) if self.username and self.password else None return urllib3.make_headers(basic_auth=basic_auth, user_agent=USER_AGENT) - def _build_request_parameters(self, timeout=None): + def _build_request_parameters(self, etcd_nodes, timeout=None): kwargs = {'headers': self._get_headers(), 'redirect': self.allow_redirect} if timeout is not None: kwargs.update(retries=0, timeout=timeout) else: - _, per_node_timeout, per_node_retries = self._calculate_timeouts() - kwargs.update(timeout=per_node_timeout, retries=per_node_retries) + _, per_node_timeout, per_node_retries = self._calculate_timeouts(etcd_nodes) + connect_timeout = max(1, per_node_timeout/2) + kwargs.update(timeout=Timeout(connect=connect_timeout, total=per_node_timeout), retries=per_node_retries) return kwargs def set_machines_cache_ttl(self, cache_ttl): self._machines_cache_ttl = cache_ttl + @property + def machines_cache(self): + base_uri, cache = self._base_uri, self._machines_cache + return ([base_uri] if base_uri in cache else []) + [machine for machine in cache if machine != base_uri] + @property def machines(self): """Original `machines` method(property) of `etcd.Client` class raise exception @@ -155,58 +165,58 @@ class Client(etcd.Client): Also this method implements the same timeout-retry logic as `api_execute`, because the original method was retrying 2 times with the `read_timeout` on each node.""" - kwargs = self._build_request_parameters() + machines_cache = self.machines_cache + kwargs = self._build_request_parameters(len(machines_cache)) - while True: + for base_uri in machines_cache: try: - response = self.http.request(self._MGET, self._base_uri + self.version_prefix + '/machines', **kwargs) + response = self.http.request(self._MGET, base_uri + self.version_prefix + '/machines', **kwargs) data = self._handle_server_response(response).data.decode('utf-8') machines = [m.strip() for m in data.split(',') if m.strip()] logger.debug("Retrieved list of machines: %s", machines) - if not machines: - raise etcd.EtcdException - random.shuffle(machines) - for url in machines: - r = urlparse(url) - port = r.port or (443 if r.scheme == 'https' else 80) - self._dns_resolver.resolve_async(r.hostname, port) - return machines + if machines: + random.shuffle(machines) + self._update_dns_cache(self._dns_resolver.resolve_async, machines) + return machines except Exception as e: - # We can't get the list of machines, if one server is in the - # machines cache, try on it - logger.error("Failed to get list of machines from %s%s: %r", self._base_uri, self.version_prefix, e) - if self._machines_cache: - self._base_uri = self._machines_cache.pop(0) - logger.info("Retrying on %s", self._base_uri) - elif self._update_machines_cache: - raise etcd.EtcdException("Could not get the list of servers, " - "maybe you provided the wrong " - "host(s) to connect to?") - else: - return [] + self.http.clear() + logger.error("Failed to get list of machines from %s%s: %r", base_uri, self.version_prefix, e) + + raise etcd.EtcdConnectionFailed('No more machines in the cluster') def set_read_timeout(self, timeout): self._read_timeout = timeout - def _do_http_request(self, request_executor, method, url, fields=None, **kwargs): - try: - response = request_executor(method, url, fields=fields, **kwargs) - response.data.decode('utf-8') - self._check_cluster_id(response) - except (HTTPError, HTTPException, socket.error, socket.timeout) as e: - if (isinstance(fields, dict) and fields.get("wait") == "true" and - isinstance(e, (ReadTimeoutError, ProtocolError))): - logger.debug("Watch timed out.") + def _do_http_request(self, retry, machines_cache, request_executor, method, path, fields=None, **kwargs): + some_request_failed = False + for i, base_uri in enumerate(machines_cache): + if i > 0: + logger.info("Retrying on %s", base_uri) + try: + response = request_executor(method, base_uri + path, fields=fields, **kwargs) + response.data.decode('utf-8') + self._check_cluster_id(response) + if some_request_failed: + self.set_base_uri(base_uri) + self._refresh_machines_cache() + return response + except (HTTPError, HTTPException, socket.error, socket.timeout) as e: + self.http.clear() # switch to the next etcd node because we don't know exactly what happened, # whether the key didn't received an update or there is a network problem. - self._machines_cache.insert(0, self._base_uri) - self._base_uri = self._next_server() - raise etcd.EtcdWatchTimedOut("Watch timed out: {0}".format(e), cause=e) - logger.error("Request to server %s failed: %r", self._base_uri, e) - logger.info("Reconnection allowed, looking for another server.") - self._base_uri = self._next_server(cause=e) - response = False - return response + if not retry and i + 1 < len(machines_cache): + self.set_base_uri(machines_cache[i + 1]) + if (isinstance(fields, dict) and fields.get("wait") == "true" and + isinstance(e, (ReadTimeoutError, ProtocolError))): + logger.debug("Watch timed out.") + raise etcd.EtcdWatchTimedOut("Watch timed out: {0}".format(e), cause=e) + logger.error("Request to server %s failed: %r", base_uri, e) + logger.info("Reconnection allowed, looking for another server.") + if not retry: + raise etcd.EtcdException('{0} {1} request failed'.format(method, path)) + some_request_failed = True + + raise etcd.EtcdConnectionFailed('No more machines in the cluster') def api_execute(self, path, method, params=None, timeout=None): if not path.startswith('/'): @@ -229,44 +239,34 @@ class Client(etcd.Client): elif not self._use_proxies and time.time() - self._machines_cache_updated > self._machines_cache_ttl: self._refresh_machines_cache() - kwargs.update(self._build_request_parameters(timeout)) - - if retry: - machines_cache = [self._base_uri] + self._machines_cache - - response = False + machines_cache = self.machines_cache + etcd_nodes = len(machines_cache) + kwargs.update(self._build_request_parameters(etcd_nodes, timeout)) while True: try: - some_request_failed = False - while not response: - response = self._do_http_request(request_executor, method, self._base_uri + path, **kwargs) - - if response is False: - if not retry: - raise etcd.EtcdException('{0} {1} request failed'.format(method, path)) - some_request_failed = True - if some_request_failed: - self._refresh_machines_cache() - if response: - break - except etcd.EtcdConnectionFailed: - if not retry: - raise + response = self._do_http_request(retry, machines_cache, request_executor, method, path, **kwargs) + return self._handle_server_response(response) + except etcd.EtcdWatchTimedOut: + raise + except etcd.EtcdConnectionFailed as ex: + try: + if self._load_machines_cache(): + machines_cache = self.machines_cache + etcd_nodes = len(machines_cache) + except Exception as e: + logger.debug('Failed to update list of etcd nodes: %r', e) sleeptime = retry.sleeptime remaining_time = retry.stoptime - sleeptime - time.time() - nodes, timeout, retries = self._calculate_timeouts(len(machines_cache), remaining_time) + nodes, timeout, retries = self._calculate_timeouts(etcd_nodes, remaining_time) if nodes == 0: self._update_machines_cache = True - raise + raise ex retry.sleep_func(sleeptime) retry.update_delay() - # We still have some time left. Partially restore `_machines_cache` and retry request - kwargs.update(timeout=timeout, retries=retries) - self._base_uri = machines_cache[0] - self._machines_cache = machines_cache[1:nodes] - - return self._handle_server_response(response) + # We still have some time left. Partially reduce `machines_cache` and retry request + kwargs.update(timeout=Timeout(connect=max(1, timeout/2), total=timeout), retries=retries) + machines_cache = machines_cache[:nodes] @staticmethod def get_srv_record(host): @@ -327,6 +327,13 @@ class Client(etcd.Client): machines_cache = self._get_machines_cache_from_dns(self._config['host'], self._config['port']) return machines_cache + @staticmethod + def _update_dns_cache(func, machines): + for url in machines: + r = urlparse(url) + port = r.port or (443 if r.scheme == 'https' else 80) + func(r.hostname, port) + def _load_machines_cache(self): """This method should fill up `_machines_cache` from scratch. It could happen only in two cases: @@ -338,25 +345,49 @@ class Client(etcd.Client): if 'srv' not in self._config and 'host' not in self._config and 'hosts' not in self._config: raise Exception('Neither srv, hosts, host nor url are defined in etcd section of config') - self._machines_cache = self._get_machines_cache_from_config() - + machines_cache = self._get_machines_cache_from_config() # Can not bootstrap list of etcd-cluster members, giving up - if not self._machines_cache: + if not machines_cache: raise etcd.EtcdException - # After filling up initial list of machines_cache we should ask etcd-cluster about actual list - self._base_uri = self._next_server() - self._refresh_machines_cache() - self._update_machines_cache = False + # enforce resolving dns name,they might get new ips + self._update_dns_cache(self._dns_resolver.remove, machines_cache) - def _refresh_machines_cache(self): - self._machines_cache = self._get_machines_cache_from_config() if self._use_proxies else self.machines - if self._base_uri in self._machines_cache: - self._machines_cache.remove(self._base_uri) - elif self._machines_cache: - self._base_uri = self._next_server() + # The etcd cluster could change its topology over time and depending on how we resolve the initial + # topology (list of hosts in the Patroni config or DNS records, A or SRV) we might get into the situation + # the the real topology doesn't match anymore with the topology resolved from the configuration file. + # In case if the "initial" topology is the same as before we will not override the `_machines_cache`. + ret = set(machines_cache) != set(self._initial_machines_cache) + if ret: + self._initial_machines_cache = self._machines_cache = machines_cache + + # After filling up the initial list of machines_cache we should ask etcd-cluster about actual list + self._refresh_machines_cache(True) + + self._update_machines_cache = False + return ret + + def _refresh_machines_cache(self, updating_cache=False): + if self._use_proxies: + self._machines_cache = self._get_machines_cache_from_config() + else: + try: + self._machines_cache = self.machines + except etcd.EtcdConnectionFailed: + if updating_cache: + raise etcd.EtcdException("Could not get the list of servers, " + "maybe you provided the wrong " + "host(s) to connect to?") + return + + if self._base_uri not in self._machines_cache: + self.set_base_uri(self._machines_cache[0]) self._machines_cache_updated = time.time() + def set_base_uri(self, value): + logger.info('Selected new etcd server %s', value) + self._base_uri = value + class Etcd(AbstractDCS): @@ -633,7 +664,6 @@ class Etcd(AbstractDCS): # than reestablishing http connection every time from every replica. return True except etcd.EtcdWatchTimedOut: - self._client.http.clear() self._has_failed = False return False except (etcd.EtcdEventIndexCleared, etcd.EtcdWatcherCleared): # Watch failed diff --git a/tests/test_etcd.py b/tests/test_etcd.py index 8432fad9..ceef0d2d 100644 --- a/tests/test_etcd.py +++ b/tests/test_etcd.py @@ -130,12 +130,11 @@ class TestClient(unittest.TestCase): self.client.http.request_encode_body = http_request def test_machines(self): - self.client._base_uri = 'http://localhost:4001' - self.client._machines_cache = ['http://localhost:2379'] + self.client._base_uri = 'http://localhost:4002' + self.client._machines_cache = ['http://localhost:4002', 'http://localhost:2379'] self.assertIsNotNone(self.client.machines) self.client._base_uri = 'http://localhost:4001' - self.client._machines_cache = [] - self.assertIsNotNone(self.client.machines) + self.client._machines_cache = ['http://localhost:4001'] self.client._update_machines_cache = True machines = None try: @@ -146,16 +145,13 @@ class TestClient(unittest.TestCase): @patch.object(Client, 'machines') def test_api_execute(self, mock_machines): - mock_machines.__get__ = Mock(return_value=['http://localhost:2379']) + mock_machines.__get__ = Mock(return_value=['http://localhost:4001', 'http://localhost:2379']) self.assertRaises(ValueError, self.client.api_execute, '', '') self.client._base_uri = 'http://localhost:4001' - self.client._machines_cache = ['http://localhost:2379'] self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', 'POST', timeout=0) self.client._base_uri = 'http://localhost:4001' - self.client._machines_cache = ['http://localhost:2379'] rtry = Retry(deadline=10, max_delay=1, max_tries=-1, retry_exceptions=(etcd.EtcdLeaderElectionInProgress,)) rtry(self.client.api_execute, '/', 'POST', timeout=0, params={'retry': rtry}) - mock_machines.__get__ = Mock(return_value=['http://localhost:2379']) self.client._machines_cache_updated = 0 self.client.api_execute('/', 'POST', timeout=0) self.client._machines_cache = [self.client._base_uri] @@ -163,10 +159,17 @@ class TestClient(unittest.TestCase): self.assertRaises(etcd.EtcdWatchTimedOut, self.client.api_execute, '/timeout', 'POST', params={'wait': 'true'}) self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', '') - with patch.object(Client, '_do_http_request', Mock(side_effect=etcd.EtcdConnectionFailed)): - with patch.object(Client, '_calculate_timeouts', Mock(side_effect=[(1, 1, 0), (1, 1, 0), (0, 1, 0)])): - self.assertRaises(etcd.EtcdException, rtry, self.client.api_execute, '/', 'GET', params={'retry': rtry}) - self.client._read_timeout = 0 + with patch.object(Client, '_calculate_timeouts', Mock(side_effect=[(1, 1, 0), (1, 1, 0), (0, 1, 0)])),\ + patch.object(Client, '_load_machines_cache', Mock(side_effect=Exception)): + self.client.http.request = Mock(side_effect=socket.error) + self.assertRaises(etcd.EtcdException, rtry, self.client.api_execute, '/', 'GET', params={'retry': rtry}) + + with patch.object(Client, '_calculate_timeouts', Mock(side_effect=[(1, 1, 0), (1, 1, 0), (0, 1, 0)])),\ + patch.object(Client, '_load_machines_cache', Mock(return_value=True)): + self.assertRaises(etcd.EtcdException, rtry, self.client.api_execute, '/', 'GET', params={'retry': rtry}) + + with patch.object(Client, '_do_http_request', Mock(side_effect=etcd.EtcdException)): + self.client._read_timeout = 0.01 self.assertRaises(etcd.EtcdException, self.client.api_execute, '/', 'GET') def test_get_srv_record(self): @@ -182,8 +185,12 @@ class TestClient(unittest.TestCase): self.client._get_machines_cache_from_dns('error', 2379) @patch.object(Client, 'machines') - def test__load_machines_cache(self, mock_machines): - mock_machines.__get__ = Mock(return_value=['http://localhost:2379']) + def test__refresh_machines_cache(self, mock_machines): + mock_machines.__get__ = Mock(side_effect=etcd.EtcdConnectionFailed) + self.assertIsNone(self.client._refresh_machines_cache()) + self.assertRaises(etcd.EtcdException, self.client._refresh_machines_cache, True) + + def test__load_machines_cache(self): self.client._config = {} self.assertRaises(Exception, self.client._load_machines_cache) self.client._config = {'srv': 'blabla'}