From 23dcfaab49649e86d6ed874e98cef3f84d3d0eb6 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Fri, 14 Aug 2020 12:39:47 +0200 Subject: [PATCH] Make it possible to bypass kubernetes service (#1614) When running on K8s Patroni is communicating with API via the `kubernetes` service, which is address is exposed via the `KUBERNETES_SERVICE_HOST` environment variable. Like any other service, the `kubernetes` service is handled by `kube-proxy`, that depending on configuration is either relying on userspace program or `iptables` for traffic routing. During K8s upgrade, when master nodes are replaced, it is possible that `kube-proxy` doesn't update the service configuration in time and as a result Patroni fails to update the leader lock and demotes postgres. In order to improve the user experience and get more control on the problem we make it possible to bypass the `kubernetes` service and connect directly to API nodes. The strategy is very simple: 1. Resolve list IPs of API nodes from the kubernetes endpoint on every iteration of HA loop. 2. Stick to one of these IPs for API requests 3. Switch to a different IP if connected to IP is not from the list 4. If the request fails, switch to another IP and retry Such a strategy is already used for Etcd and proven to work quite well. In order to enable the feature, you need either to set to `true` `kubernetes.bypass_api_service` in the Patroni configuration file or `PATRONI_KUBERNETES_BYPASS_API_SERVICE` environment variable. If for some reason `GET /default/endpoints/kubernetes` isn't allowed Patroni will disable the feature. --- docs/ENVIRONMENT.rst | 1 + docs/SETTINGS.rst | 1 + features/environment.py | 1 + .../templates/template_patroni_ephemeral.yml | 30 +++ .../template_patroni_persistent.yaml | 30 +++ kubernetes/patroni_k8s.yaml | 36 +++ patroni/config.py | 5 +- patroni/dcs/kubernetes.py | 209 ++++++++++++++++-- tests/test_kubernetes.py | 45 +++- 9 files changed, 333 insertions(+), 25 deletions(-) diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 3f125909..b27c7382 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -87,6 +87,7 @@ Exhibitor Kubernetes ---------- +- **PATRONI\_KUBERNETES\_BYPASS\_API\_SERVICE**: (optional) When communicating with the Kubernetes API, Patroni is usually relying on the `kubernetes` service, the address of which is exposed in the pods via the `KUBERNETES_SERVICE_HOST` environment variable. If `PATRONI_KUBERNETES_BYPASS_API_SERVICE` is set to ``true``, Patroni will resolve the list of API nodes behind the service and connect directly to them. - **PATRONI\_KUBERNETES\_NAMESPACE**: (optional) Kubernetes namespace where the Patroni pod is running. Default value is `default`. - **PATRONI\_KUBERNETES\_LABELS**: Labels in format ``{label1: value1, label2: value2}``. These labels will be used to find existing objects (Pods and either Endpoints or ConfigMaps) associated with the current cluster. Also Patroni will set them on every object (Endpoint or ConfigMap) it creates. - **PATRONI\_KUBERNETES\_SCOPE\_LABEL**: (optional) name of the label containing cluster name. Default value is `cluster-name`. diff --git a/docs/SETTINGS.rst b/docs/SETTINGS.rst index f8004bac..7014d55a 100644 --- a/docs/SETTINGS.rst +++ b/docs/SETTINGS.rst @@ -159,6 +159,7 @@ Exhibitor Kubernetes ---------- +- **bypass\_api\_service**: (optional) When communicating with the Kubernetes API, Patroni is usually relying on the `kubernetes` service, the address of which is exposed in the pods via the `KUBERNETES_SERVICE_HOST` environment variable. If `bypass_api_service` is set to ``true``, Patroni will resolve the list of API nodes behind the service and connect directly to them. - **namespace**: (optional) Kubernetes namespace where Patroni pod is running. Default value is `default`. - **labels**: Labels in format ``{label1: value1, label2: value2}``. These labels will be used to find existing objects (Pods and either Endpoints or ConfigMaps) associated with the current cluster. Also Patroni will set them on every object (Endpoint or ConfigMap) it creates. - **scope\_label**: (optional) name of the label containing cluster name. Default value is `cluster-name`. diff --git a/features/environment.py b/features/environment.py index 13f37aa2..e04399c6 100644 --- a/features/environment.py +++ b/features/environment.py @@ -454,6 +454,7 @@ class KubernetesController(AbstractDcsController): self._label_selector = ','.join('{0}={1}'.format(k, v) for k, v in self._labels.items()) os.environ['PATRONI_KUBERNETES_LABELS'] = json.dumps(self._labels) os.environ['PATRONI_KUBERNETES_USE_ENDPOINTS'] = 'true' + os.environ['PATRONI_KUBERNETES_BYPASS_API_SERVICE'] = 'true' from patroni.dcs.kubernetes import k8s_client, k8s_config k8s_config.load_kube_config(context='local') diff --git a/kubernetes/openshift-example/templates/template_patroni_ephemeral.yml b/kubernetes/openshift-example/templates/template_patroni_ephemeral.yml index 98c66c6d..37cdd753 100644 --- a/kubernetes/openshift-example/templates/template_patroni_ephemeral.yml +++ b/kubernetes/openshift-example/templates/template_patroni_ephemeral.yml @@ -118,6 +118,8 @@ objects: fieldRef: apiVersion: v1 fieldPath: metadata.namespace + - name: PATRONI_KUBERNETES_BYPASS_API_SERVICE + value: 'true' - name: PATRONI_KUBERNETES_LABELS value: '{application: ${APPLICATION_NAME}, cluster-name: ${PATRONI_CLUSTER_NAME}}' - name: PATRONI_SUPERUSER_USERNAME @@ -250,6 +252,34 @@ objects: subjects: - kind: ServiceAccount name: ${SERVICE_ACCOUNT} +# Following privileges are only required if deployed not in the "default" +# namespace and you want Patroni to bypass kubernetes service +# (PATRONI_KUBERNETES_BYPASS_API_SERVICE=true) +- apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRole + metadata: + name: patroni-k8s-ep-access + rules: + - apiGroups: + - "" + resources: + - endpoints + resourceNames: + - kubernetes + verbs: + - get +- apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRoleBinding + metadata: + name: ${NAMESPACE}-${SERVICE_ACCOUNT}-k8s-ep-access + roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: patroni-k8s-ep-access + subjects: + - kind: ServiceAccount + name: ${SERVICE_ACCOUNT} + namespace: ${NAMESPACE} parameters: - description: The name of the application for labelling all artifacts. displayName: Application Name diff --git a/kubernetes/openshift-example/templates/template_patroni_persistent.yaml b/kubernetes/openshift-example/templates/template_patroni_persistent.yaml index 5a2a61d2..d8b3342f 100644 --- a/kubernetes/openshift-example/templates/template_patroni_persistent.yaml +++ b/kubernetes/openshift-example/templates/template_patroni_persistent.yaml @@ -130,6 +130,8 @@ objects: fieldRef: apiVersion: v1 fieldPath: metadata.namespace + - name: PATRONI_KUBERNETES_BYPASS_API_SERVICE + value: 'true' - name: PATRONI_KUBERNETES_LABELS value: '{application: ${APPLICATION_NAME}, cluster-name: ${PATRONI_CLUSTER_NAME}}' - name: PATRONI_SUPERUSER_USERNAME @@ -274,6 +276,34 @@ objects: subjects: - kind: ServiceAccount name: ${SERVICE_ACCOUNT} +# Following privileges are only required if deployed not in the "default" +# namespace and you want Patroni to bypass kubernetes service +# (PATRONI_KUBERNETES_BYPASS_API_SERVICE=true) +- apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRole + metadata: + name: patroni-k8s-ep-access + rules: + - apiGroups: + - "" + resources: + - endpoints + resourceNames: + - kubernetes + verbs: + - get +- apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRoleBinding + metadata: + name: ${NAMESPACE}-${SERVICE_ACCOUNT}-k8s-ep-access + roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: patroni-k8s-ep-access + subjects: + - kind: ServiceAccount + name: ${SERVICE_ACCOUNT} + namespace: ${NAMESPACE} parameters: - description: The name of the application for labelling all artifacts. displayName: Application Name diff --git a/kubernetes/patroni_k8s.yaml b/kubernetes/patroni_k8s.yaml index c99b25b5..71d03c53 100644 --- a/kubernetes/patroni_k8s.yaml +++ b/kubernetes/patroni_k8s.yaml @@ -58,6 +58,8 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: PATRONI_KUBERNETES_BYPASS_API_SERVICE + value: 'true' - name: PATRONI_KUBERNETES_USE_ENDPOINTS value: 'true' - name: PATRONI_KUBERNETES_LABELS @@ -239,3 +241,37 @@ roleRef: subjects: - kind: ServiceAccount name: patronidemo + +# Following privileges are only required if deployed not in the "default" +# namespace and you want Patroni to bypass kubernetes service +# (PATRONI_KUBERNETES_BYPASS_API_SERVICE=true) +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: patroni-k8s-ep-access +rules: +- apiGroups: + - "" + resources: + - endpoints + resourceNames: + - kubernetes + verbs: + - get + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: patroni-k8s-ep-access +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: patroni-k8s-ep-access +subjects: +- kind: ServiceAccount + name: patronidemo +# The namespace must be specified explicitly. +# If deploying to the different namespace you have to change it. + namespace: default diff --git a/patroni/config.py b/patroni/config.py index 51f436b5..10a81f3c 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -305,7 +305,8 @@ class Config(object): if suffix in ('HOST', 'HOSTS', 'PORT', 'USE_PROXIES', 'PROTOCOL', 'SRV', 'URL', 'PROXY', 'CACERT', 'CERT', 'KEY', 'VERIFY', 'TOKEN', 'CHECKS', 'DC', 'CONSISTENCY', 'REGISTER_SERVICE', 'SERVICE_CHECK_INTERVAL', 'NAMESPACE', 'CONTEXT', - 'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL', 'POD_IP', 'PORTS', 'LABELS') and name: + 'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL', 'POD_IP', 'PORTS', 'LABELS', + 'BYPASS_API_SERVICE') and name: value = os.environ.pop(param) if suffix == 'PORT': value = value and parse_int(value) @@ -313,7 +314,7 @@ class Config(object): value = value and _parse_list(value) elif suffix == 'LABELS': value = _parse_dict(value) - elif suffix in ('USE_PROXIES', 'REGISTER_SERVICE'): + elif suffix in ('USE_PROXIES', 'REGISTER_SERVICE', 'BYPASS_API_SERVICE'): value = parse_bool(value) if value: ret[name.lower()][suffix.lower()] = value diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 7a27376a..8a028eb2 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -3,6 +3,7 @@ import functools import json import logging import os +import random import socket import six import sys @@ -138,6 +139,14 @@ class K8sObject(object): return json.dumps(self, indent=4, default=lambda o: o.to_dict()) +class K8sException(Exception): + pass + + +class K8sConnectionFailed(K8sException): + pass + + class K8sClient(object): class rest(object): @@ -161,13 +170,31 @@ class K8sClient(object): _API_URL_PREFIX = '/api/v1/namespaces/' - def __init__(self): + def __init__(self, bypass_api_service=False): + self._bypass_api_service = bypass_api_service self.pool_manager = urllib3.PoolManager(**k8s_config.pool_config) + self._base_uri = k8s_config.server + self._api_servers_cache = [k8s_config.server] + self._api_servers_cache_updated = 0 + self.set_api_servers_cache_ttl(10) self.set_read_timeout(10) + try: + self._load_api_servers_cache() + except K8sException: + pass def set_read_timeout(self, timeout): self._read_timeout = timeout + def set_api_servers_cache_ttl(self, ttl): + self._api_servers_cache_ttl = ttl - 0.5 + + def set_base_uri(self, value): + logger.info('Selected new K8s API server endpoint %s', value) + # We will connect by IP of the master node which is not listed as alternative name + self.pool_manager.connection_pool_kw['assert_hostname'] = False + self._base_uri = value + @staticmethod def _handle_server_response(response, _preload_content): if response.status not in range(200, 206): @@ -180,26 +207,165 @@ class K8sClient(object): ret.update(headers or {}) return ret - def request(self, method, path, timeout=None, **kwargs): - retries = 0 if timeout else 1 + @property + def api_servers_cache(self): + base_uri, cache = self._base_uri, self._api_servers_cache + return ([base_uri] if base_uri in cache else []) + [machine for machine in cache if machine != base_uri] + + def _get_api_servers(self, api_servers_cache): + _, per_node_timeout, per_node_retries = self._calculate_timeouts(len(api_servers_cache)) + kwargs = {'headers': self._make_headers({}), 'preload_content': True, 'retries': per_node_retries, + 'timeout': urllib3.Timeout(connect=max(1, per_node_timeout/2.0), total=per_node_timeout)} + path = self._API_URL_PREFIX + 'default/endpoints/kubernetes' + for base_uri in api_servers_cache: + try: + response = self.pool_manager.request('GET', base_uri + path, **kwargs) + endpoint = self._handle_server_response(response, True) + for subset in endpoint.subsets: + for port in subset.ports: + if port.name == 'https' and port.protocol == 'TCP': + addresses = [uri('https', (a.ip, port.port)) for a in subset.addresses] + if addresses: + random.shuffle(addresses) + return addresses + except Exception as e: + if isinstance(e, k8s_client.rest.ApiException) and e.status == 403: + raise + self.pool_manager.clear() + logger.error('Failed to get "kubernetes" endpoint from %s: %r', base_uri, e) + raise K8sConnectionFailed('No more K8s API server nodes in the cluster') + + def _refresh_api_servers_cache(self, updating_cache=False): + if self._bypass_api_service: + try: + api_servers_cache = [k8s_config.server] if updating_cache else self.api_servers_cache + self._api_servers_cache = self._get_api_servers(api_servers_cache) + if updating_cache: + self.pool_manager.clear() + except k8s_client.rest.ApiException: # 403 Permission denied + logger.warning("Kubernetes RBAC doesn't allow GET access to the 'kubernetes' " + "endpoint in the 'default' namespace. Disabling 'bypass_api_service'.") + self._bypass_api_service = False + self._api_servers_cache = [k8s_config.server] + if not updating_cache: + self.pool_manager.clear() + except K8sConnectionFailed: + if updating_cache: + raise K8sException("Could not get the list of K8s API server nodes") + return + else: + self._api_servers_cache = [k8s_config.server] + + if self._base_uri not in self._api_servers_cache: + self.set_base_uri(self._api_servers_cache[0]) + self._api_servers_cache_updated = time.time() + + def refresh_api_servers_cache(self): + if self._bypass_api_service and time.time() - self._api_servers_cache_updated > self._api_servers_cache_ttl: + self._refresh_api_servers_cache() + + def _load_api_servers_cache(self): + self._update_api_servers_cache = True + self._refresh_api_servers_cache(True) + self._update_api_servers_cache = False + + def _calculate_timeouts(self, api_servers, timeout=None): + """Calculate a request timeout and number of retries per single K8s API server node. + In case if the timeout per node is too small (less than one second) we will reduce the number of nodes. + For the cluster with only one API server node we will try to do 1 retry. + No retries for clusters with 2 or more API server nodes. We better rely on switching to a different node.""" + + per_node_timeout = timeout = float(timeout or self._read_timeout) + + max_retries = 3 - min(api_servers, 2) + per_node_retries = 1 + min_timeout = 1.0 + + while api_servers > 0: + per_node_timeout = float(timeout) / api_servers + if per_node_timeout >= min_timeout: + # for small clusters we will try to do more than one try on every node + while per_node_retries < max_retries and per_node_timeout / (per_node_retries + 1) >= min_timeout: + per_node_retries += 1 + per_node_timeout /= per_node_retries + break + # if the timeout per one node is to small try to reduce number of nodes + api_servers -= 1 + max_retries = 1 + + return api_servers, per_node_timeout, per_node_retries - 1 + + def _do_http_request(self, retry, api_servers_cache, method, path, **kwargs): + some_request_failed = False + for i, base_uri in enumerate(api_servers_cache): + if i > 0: + logger.info('Retrying on %s', base_uri) + try: + response = self.pool_manager.request(method, base_uri + path, **kwargs) + if some_request_failed: + self.set_base_uri(base_uri) + self._refresh_api_servers_cache() + return response + except (HTTPError, HTTPException, socket.error, socket.timeout) as e: + self.pool_manager.clear() + if not retry: + # switch to the next node if request failed and retry is not allowed + if i + 1 < len(api_servers_cache): + self.set_base_uri(api_servers_cache[i + 1]) + raise K8sException('{0} {1} request failed'.format(method, path)) + logger.error('Request to server %s failed: %r', base_uri, e) + some_request_failed = True + + raise K8sConnectionFailed('No more API server nodes in the cluster') + + def request(self, retry, method, path, timeout=None, **kwargs): + if self._update_api_servers_cache: + self._load_api_servers_cache() + + api_servers_cache = self.api_servers_cache + api_servers = len(api_servers_cache) + if timeout: if isinstance(timeout, six.integer_types + (float,)): timeout = urllib3.Timeout(total=timeout) elif isinstance(timeout, tuple) and len(timeout) == 2: timeout = urllib3.Timeout(connect=timeout[0], read=timeout[1]) + retries = 0 else: - timeout = self._read_timeout / 2.0 + _, timeout, retries = self._calculate_timeouts(api_servers) timeout = urllib3.Timeout(connect=max(1, timeout/2.0), total=timeout) kwargs.update(retries=retries, timeout=timeout) - return self.pool_manager.request(method, k8s_config.server + path, **kwargs) - def call_api(self, method, path, headers=None, body=None, + while True: + try: + return self._do_http_request(retry, api_servers_cache, method, path, **kwargs) + except K8sConnectionFailed as ex: + try: + self._load_api_servers_cache() + api_servers_cache = self.api_servers_cache + api_servers = len(api_servers) + except Exception as e: + logger.debug('Failed to update list of K8s master nodes: %r', e) + + sleeptime = retry.sleeptime + remaining_time = retry.stoptime - sleeptime - time.time() + nodes, timeout, retries = self._calculate_timeouts(api_servers, remaining_time) + if nodes == 0: + self._update_api_servers_cache = True + raise ex + retry.sleep_func(sleeptime) + retry.update_delay() + # We still have some time left. Partially reduce `api_servers_cache` and retry request + kwargs.update(timeout=urllib3.Timeout(connect=max(1, timeout/2.0), total=timeout), retries=retries) + api_servers_cache = api_servers_cache[:nodes] + + def call_api(self, method, path, headers=None, body=None, _retry=None, _preload_content=True, _request_timeout=None, **kwargs): headers = self._make_headers(headers) fields = {to_camel_case(k): v for k, v in kwargs.items()} # resource_version => resourceVersion body = json.dumps(body, default=lambda o: o.to_dict()) if body is not None else None - response = self.request(method, self._API_URL_PREFIX + path, headers=headers, fields=fields, + response = self.request(_retry, method, self._API_URL_PREFIX + path, headers=headers, fields=fields, body=body, preload_content=_preload_content, timeout=_request_timeout) return self._handle_server_response(response, _preload_content) @@ -273,8 +439,8 @@ class KubernetesRetriableException(k8s_client.rest.ApiException): class CoreV1ApiProxy(object): - def __init__(self, use_endpoints=False): - self._api_client = k8s_client.ApiClient() + def __init__(self, use_endpoints=False, bypass_api_service=False): + self._api_client = k8s_client.ApiClient(bypass_api_service) self._core_v1_api = k8s_client.CoreV1Api(self._api_client) self._use_endpoints = bool(use_endpoints) @@ -286,6 +452,10 @@ class CoreV1ApiProxy(object): self._api_client.pool_manager.connection_pool_kw['socket_options'] = \ list(keepalive_socket_options(ttl, int(loop_wait + retry_timeout))) self._api_client.set_read_timeout(retry_timeout) + self._api_client.set_api_servers_cache_ttl(loop_wait) + + def refresh_api_servers_cache(self): + self._api_client.refresh_api_servers_cache() def __getattr__(self, func): if func.endswith('_kind'): @@ -315,7 +485,7 @@ def catch_kubernetes_errors(func): elif e.status != 409: # Object exists or conflict in resource_version logger.exception('Unexpected error from Kubernetes API') return False - except (RetryFailedError, HTTPException, HTTPError, socket.error, socket.timeout): + except (RetryFailedError, K8sException): return False return wrapper @@ -338,7 +508,7 @@ class ObjectCache(Thread): def _list(self): try: - return self._func(_request_timeout=(self._retry.deadline, Timeout.DEFAULT_TIMEOUT)) + return self._func(_retry=self._retry.copy()) except Exception: time.sleep(1) raise @@ -446,8 +616,7 @@ class Kubernetes(AbstractDCS): config['namespace'] = '' super(Kubernetes, self).__init__(config) self._retry = Retry(deadline=config['retry_timeout'], max_delay=1, max_tries=-1, - retry_exceptions=(KubernetesRetriableException, HTTPException, - HTTPError, socket.error, socket.timeout)) + retry_exceptions=KubernetesRetriableException) self._ttl = None try: k8s_config.load_incluster_config() @@ -462,7 +631,7 @@ class Kubernetes(AbstractDCS): port.update({n: p[n] for n in ('name', 'protocol') if p.get(n)}) self.__ports.append(k8s_client.V1EndpointPort(**port)) - self._api = CoreV1ApiProxy(config.get('use_endpoints')) + self._api = CoreV1ApiProxy(config.get('use_endpoints'), config.get('bypass_api_service')) self._should_create_config_service = self._api.use_endpoints self.reload_config(config) # leader_observed_record, leader_resource_version, and leader_observed_time are used only for leader race! @@ -482,7 +651,9 @@ class Kubernetes(AbstractDCS): self._kinds = ObjectCache(self, kinds_func, self._retry, self._condition, self._name) def retry(self, *args, **kwargs): - return self._retry.copy()(*args, **kwargs) + retry = self._retry.copy() + kwargs['_retry'] = retry + return retry(*args, **kwargs) def client_path(self, path): return super(Kubernetes, self).client_path(path)[1:].replace('/', '-') @@ -514,8 +685,7 @@ class Kubernetes(AbstractDCS): member.data['pod_labels'] = pod.metadata.labels return member - def _wait_caches(self): - stop_time = time.time() + self._retry.deadline + def _wait_caches(self, stop_time): while not (self._pods.is_ready() and self._kinds.is_ready()): timeout = stop_time - time.time() if timeout <= 0: @@ -523,9 +693,11 @@ class Kubernetes(AbstractDCS): self._condition.wait(timeout) def _load_cluster(self): + stop_time = time.time() + self._retry.deadline + self._api.refresh_api_servers_cache() try: with self._condition: - self._wait_caches() + self._wait_caches(stop_time) members = [self.member(pod) for pod in self._pods.copy().values()] nodes = self._kinds.copy() @@ -718,6 +890,7 @@ class Kubernetes(AbstractDCS): retry = self._retry.copy() def _retry(*args, **kwargs): + kwargs['_retry'] = retry return retry(*args, **kwargs) try: diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 7e8f0d1b..cb9462e6 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -1,10 +1,12 @@ import json +import socket import time import unittest from mock import Mock, mock_open, patch -from patroni.dcs.kubernetes import Kubernetes, KubernetesError, K8sConfig, K8sObject, RetryFailedError,\ - k8s_client, k8s_config, SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME +from patroni.dcs.kubernetes import k8s_client, k8s_config, K8sConfig, K8sConnectionFailed,\ + K8sException, K8sObject, Kubernetes, KubernetesError, KubernetesRetriableException,\ + Retry, RetryFailedError, SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME from six.moves import builtins from threading import Thread from . import MockResponse, SleepException @@ -93,12 +95,43 @@ class TestK8sConfig(unittest.TestCase): self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer token') +@patch('urllib3.PoolManager.request') +class TestApiClient(unittest.TestCase): + + @patch.object(K8sConfig, '_server', '', create=True) + @patch('urllib3.PoolManager.request', Mock()) + def setUp(self): + self.a = k8s_client.ApiClient(True) + self.mock_get_ep = MockResponse() + self.mock_get_ep.content = '{"subsets":[{"ports":[{"name":"https","protocol":"TCP","port":443}],' +\ + '"addresses":[{"ip":"127.0.0.1"},{"ip":"127.0.0.2"}]}]}' + + def test__do_http_request(self, mock_request): + mock_request.side_effect = [self.mock_get_ep] + [socket.timeout] + self.assertRaises(K8sException, self.a.call_api, 'GET', 'f') + + @patch('time.sleep', Mock()) + def test_request(self, mock_request): + retry = Retry(deadline=10, max_delay=1, max_tries=1, retry_exceptions=KubernetesRetriableException) + mock_request.side_effect = [self.mock_get_ep] + 3 * [socket.timeout] + [k8s_client.rest.ApiException(500, '')] + self.assertRaises(k8s_client.rest.ApiException, retry, self.a.call_api, 'GET', 'f', _retry=retry) + mock_request.side_effect = [self.mock_get_ep, socket.timeout, Mock(), self.mock_get_ep] + self.assertRaises(k8s_client.rest.ApiException, retry, self.a.call_api, 'GET', 'f', _retry=retry) + retry.deadline = 0.0001 + mock_request.side_effect = [socket.timeout, socket.timeout, self.mock_get_ep] + self.assertRaises(K8sConnectionFailed, retry, self.a.call_api, 'GET', 'f', _retry=retry) + + def test__refresh_api_servers_cache(self, mock_request): + mock_request.side_effect = k8s_client.rest.ApiException(403, '') + self.a.refresh_api_servers_cache() + + class TestCoreV1Api(unittest.TestCase): + @patch('urllib3.PoolManager.request', Mock()) @patch.object(K8sConfig, '_server', '', create=True) def setUp(self): self.a = k8s_client.CoreV1Api() - self.a._api_client.set_read_timeout(10) self.a._api_client.pool_manager.request = Mock(return_value=MockResponse()) def test_create_namespaced_service(self): @@ -126,6 +159,7 @@ class TestCoreV1Api(unittest.TestCase): class BaseTestKubernetes(unittest.TestCase): + @patch('urllib3.PoolManager.request', Mock()) @patch('socket.TCP_KEEPIDLE', 4, create=True) @patch('socket.TCP_KEEPINTVL', 5, create=True) @patch('socket.TCP_KEEPCNT', 6, create=True) @@ -136,7 +170,8 @@ class BaseTestKubernetes(unittest.TestCase): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True) def setUp(self, config=None): config = config or {} - config.update(ttl=30, scope='test', name='p-0', loop_wait=10, retry_timeout=10, labels={'f': 'b'}) + config.update(ttl=30, scope='test', name='p-0', loop_wait=10, + retry_timeout=10, labels={'f': 'b'}, bypass_api_service=True) self.k = Kubernetes(config) self.assertRaises(AttributeError, self.k._pods._build_cache) self.k._pods._is_ready = True @@ -152,7 +187,7 @@ class TestKubernetesConfigMaps(BaseTestKubernetes): def test__wait_caches(self): self.k._pods._is_ready = False with self.k._condition: - self.assertRaises(RetryFailedError, self.k._wait_caches) + self.assertRaises(RetryFailedError, self.k._wait_caches, time.time() + 10) @patch('time.time', Mock(return_value=time.time() + 100)) def test_get_cluster(self):