mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Make it possible to bypass kubernetes service (#1614)
When running on K8s Patroni is communicating with API via the `kubernetes` service, which is address is exposed via the `KUBERNETES_SERVICE_HOST` environment variable. Like any other service, the `kubernetes` service is handled by `kube-proxy`, that depending on configuration is either relying on userspace program or `iptables` for traffic routing. During K8s upgrade, when master nodes are replaced, it is possible that `kube-proxy` doesn't update the service configuration in time and as a result Patroni fails to update the leader lock and demotes postgres. In order to improve the user experience and get more control on the problem we make it possible to bypass the `kubernetes` service and connect directly to API nodes. The strategy is very simple: 1. Resolve list IPs of API nodes from the kubernetes endpoint on every iteration of HA loop. 2. Stick to one of these IPs for API requests 3. Switch to a different IP if connected to IP is not from the list 4. If the request fails, switch to another IP and retry Such a strategy is already used for Etcd and proven to work quite well. In order to enable the feature, you need either to set to `true` `kubernetes.bypass_api_service` in the Patroni configuration file or `PATRONI_KUBERNETES_BYPASS_API_SERVICE` environment variable. If for some reason `GET /default/endpoints/kubernetes` isn't allowed Patroni will disable the feature.
This commit is contained in:
committed by
GitHub
parent
1ab709c5f0
commit
23dcfaab49
@@ -87,6 +87,7 @@ Exhibitor
|
||||
|
||||
Kubernetes
|
||||
----------
|
||||
- **PATRONI\_KUBERNETES\_BYPASS\_API\_SERVICE**: (optional) When communicating with the Kubernetes API, Patroni is usually relying on the `kubernetes` service, the address of which is exposed in the pods via the `KUBERNETES_SERVICE_HOST` environment variable. If `PATRONI_KUBERNETES_BYPASS_API_SERVICE` is set to ``true``, Patroni will resolve the list of API nodes behind the service and connect directly to them.
|
||||
- **PATRONI\_KUBERNETES\_NAMESPACE**: (optional) Kubernetes namespace where the Patroni pod is running. Default value is `default`.
|
||||
- **PATRONI\_KUBERNETES\_LABELS**: Labels in format ``{label1: value1, label2: value2}``. These labels will be used to find existing objects (Pods and either Endpoints or ConfigMaps) associated with the current cluster. Also Patroni will set them on every object (Endpoint or ConfigMap) it creates.
|
||||
- **PATRONI\_KUBERNETES\_SCOPE\_LABEL**: (optional) name of the label containing cluster name. Default value is `cluster-name`.
|
||||
|
||||
@@ -159,6 +159,7 @@ Exhibitor
|
||||
|
||||
Kubernetes
|
||||
----------
|
||||
- **bypass\_api\_service**: (optional) When communicating with the Kubernetes API, Patroni is usually relying on the `kubernetes` service, the address of which is exposed in the pods via the `KUBERNETES_SERVICE_HOST` environment variable. If `bypass_api_service` is set to ``true``, Patroni will resolve the list of API nodes behind the service and connect directly to them.
|
||||
- **namespace**: (optional) Kubernetes namespace where Patroni pod is running. Default value is `default`.
|
||||
- **labels**: Labels in format ``{label1: value1, label2: value2}``. These labels will be used to find existing objects (Pods and either Endpoints or ConfigMaps) associated with the current cluster. Also Patroni will set them on every object (Endpoint or ConfigMap) it creates.
|
||||
- **scope\_label**: (optional) name of the label containing cluster name. Default value is `cluster-name`.
|
||||
|
||||
@@ -454,6 +454,7 @@ class KubernetesController(AbstractDcsController):
|
||||
self._label_selector = ','.join('{0}={1}'.format(k, v) for k, v in self._labels.items())
|
||||
os.environ['PATRONI_KUBERNETES_LABELS'] = json.dumps(self._labels)
|
||||
os.environ['PATRONI_KUBERNETES_USE_ENDPOINTS'] = 'true'
|
||||
os.environ['PATRONI_KUBERNETES_BYPASS_API_SERVICE'] = 'true'
|
||||
|
||||
from patroni.dcs.kubernetes import k8s_client, k8s_config
|
||||
k8s_config.load_kube_config(context='local')
|
||||
|
||||
@@ -118,6 +118,8 @@ objects:
|
||||
fieldRef:
|
||||
apiVersion: v1
|
||||
fieldPath: metadata.namespace
|
||||
- name: PATRONI_KUBERNETES_BYPASS_API_SERVICE
|
||||
value: 'true'
|
||||
- name: PATRONI_KUBERNETES_LABELS
|
||||
value: '{application: ${APPLICATION_NAME}, cluster-name: ${PATRONI_CLUSTER_NAME}}'
|
||||
- name: PATRONI_SUPERUSER_USERNAME
|
||||
@@ -250,6 +252,34 @@ objects:
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: ${SERVICE_ACCOUNT}
|
||||
# Following privileges are only required if deployed not in the "default"
|
||||
# namespace and you want Patroni to bypass kubernetes service
|
||||
# (PATRONI_KUBERNETES_BYPASS_API_SERVICE=true)
|
||||
- apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRole
|
||||
metadata:
|
||||
name: patroni-k8s-ep-access
|
||||
rules:
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- endpoints
|
||||
resourceNames:
|
||||
- kubernetes
|
||||
verbs:
|
||||
- get
|
||||
- apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRoleBinding
|
||||
metadata:
|
||||
name: ${NAMESPACE}-${SERVICE_ACCOUNT}-k8s-ep-access
|
||||
roleRef:
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
kind: ClusterRole
|
||||
name: patroni-k8s-ep-access
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: ${SERVICE_ACCOUNT}
|
||||
namespace: ${NAMESPACE}
|
||||
parameters:
|
||||
- description: The name of the application for labelling all artifacts.
|
||||
displayName: Application Name
|
||||
|
||||
@@ -130,6 +130,8 @@ objects:
|
||||
fieldRef:
|
||||
apiVersion: v1
|
||||
fieldPath: metadata.namespace
|
||||
- name: PATRONI_KUBERNETES_BYPASS_API_SERVICE
|
||||
value: 'true'
|
||||
- name: PATRONI_KUBERNETES_LABELS
|
||||
value: '{application: ${APPLICATION_NAME}, cluster-name: ${PATRONI_CLUSTER_NAME}}'
|
||||
- name: PATRONI_SUPERUSER_USERNAME
|
||||
@@ -274,6 +276,34 @@ objects:
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: ${SERVICE_ACCOUNT}
|
||||
# Following privileges are only required if deployed not in the "default"
|
||||
# namespace and you want Patroni to bypass kubernetes service
|
||||
# (PATRONI_KUBERNETES_BYPASS_API_SERVICE=true)
|
||||
- apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRole
|
||||
metadata:
|
||||
name: patroni-k8s-ep-access
|
||||
rules:
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- endpoints
|
||||
resourceNames:
|
||||
- kubernetes
|
||||
verbs:
|
||||
- get
|
||||
- apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRoleBinding
|
||||
metadata:
|
||||
name: ${NAMESPACE}-${SERVICE_ACCOUNT}-k8s-ep-access
|
||||
roleRef:
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
kind: ClusterRole
|
||||
name: patroni-k8s-ep-access
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: ${SERVICE_ACCOUNT}
|
||||
namespace: ${NAMESPACE}
|
||||
parameters:
|
||||
- description: The name of the application for labelling all artifacts.
|
||||
displayName: Application Name
|
||||
|
||||
@@ -58,6 +58,8 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: metadata.namespace
|
||||
- name: PATRONI_KUBERNETES_BYPASS_API_SERVICE
|
||||
value: 'true'
|
||||
- name: PATRONI_KUBERNETES_USE_ENDPOINTS
|
||||
value: 'true'
|
||||
- name: PATRONI_KUBERNETES_LABELS
|
||||
@@ -239,3 +241,37 @@ roleRef:
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: patronidemo
|
||||
|
||||
# Following privileges are only required if deployed not in the "default"
|
||||
# namespace and you want Patroni to bypass kubernetes service
|
||||
# (PATRONI_KUBERNETES_BYPASS_API_SERVICE=true)
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRole
|
||||
metadata:
|
||||
name: patroni-k8s-ep-access
|
||||
rules:
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- endpoints
|
||||
resourceNames:
|
||||
- kubernetes
|
||||
verbs:
|
||||
- get
|
||||
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRoleBinding
|
||||
metadata:
|
||||
name: patroni-k8s-ep-access
|
||||
roleRef:
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
kind: ClusterRole
|
||||
name: patroni-k8s-ep-access
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: patronidemo
|
||||
# The namespace must be specified explicitly.
|
||||
# If deploying to the different namespace you have to change it.
|
||||
namespace: default
|
||||
|
||||
@@ -305,7 +305,8 @@ class Config(object):
|
||||
if suffix in ('HOST', 'HOSTS', 'PORT', 'USE_PROXIES', 'PROTOCOL', 'SRV', 'URL', 'PROXY',
|
||||
'CACERT', 'CERT', 'KEY', 'VERIFY', 'TOKEN', 'CHECKS', 'DC', 'CONSISTENCY',
|
||||
'REGISTER_SERVICE', 'SERVICE_CHECK_INTERVAL', 'NAMESPACE', 'CONTEXT',
|
||||
'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL', 'POD_IP', 'PORTS', 'LABELS') and name:
|
||||
'USE_ENDPOINTS', 'SCOPE_LABEL', 'ROLE_LABEL', 'POD_IP', 'PORTS', 'LABELS',
|
||||
'BYPASS_API_SERVICE') and name:
|
||||
value = os.environ.pop(param)
|
||||
if suffix == 'PORT':
|
||||
value = value and parse_int(value)
|
||||
@@ -313,7 +314,7 @@ class Config(object):
|
||||
value = value and _parse_list(value)
|
||||
elif suffix == 'LABELS':
|
||||
value = _parse_dict(value)
|
||||
elif suffix in ('USE_PROXIES', 'REGISTER_SERVICE'):
|
||||
elif suffix in ('USE_PROXIES', 'REGISTER_SERVICE', 'BYPASS_API_SERVICE'):
|
||||
value = parse_bool(value)
|
||||
if value:
|
||||
ret[name.lower()][suffix.lower()] = value
|
||||
|
||||
@@ -3,6 +3,7 @@ import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
import six
|
||||
import sys
|
||||
@@ -138,6 +139,14 @@ class K8sObject(object):
|
||||
return json.dumps(self, indent=4, default=lambda o: o.to_dict())
|
||||
|
||||
|
||||
class K8sException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class K8sConnectionFailed(K8sException):
|
||||
pass
|
||||
|
||||
|
||||
class K8sClient(object):
|
||||
|
||||
class rest(object):
|
||||
@@ -161,13 +170,31 @@ class K8sClient(object):
|
||||
|
||||
_API_URL_PREFIX = '/api/v1/namespaces/'
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, bypass_api_service=False):
|
||||
self._bypass_api_service = bypass_api_service
|
||||
self.pool_manager = urllib3.PoolManager(**k8s_config.pool_config)
|
||||
self._base_uri = k8s_config.server
|
||||
self._api_servers_cache = [k8s_config.server]
|
||||
self._api_servers_cache_updated = 0
|
||||
self.set_api_servers_cache_ttl(10)
|
||||
self.set_read_timeout(10)
|
||||
try:
|
||||
self._load_api_servers_cache()
|
||||
except K8sException:
|
||||
pass
|
||||
|
||||
def set_read_timeout(self, timeout):
|
||||
self._read_timeout = timeout
|
||||
|
||||
def set_api_servers_cache_ttl(self, ttl):
|
||||
self._api_servers_cache_ttl = ttl - 0.5
|
||||
|
||||
def set_base_uri(self, value):
|
||||
logger.info('Selected new K8s API server endpoint %s', value)
|
||||
# We will connect by IP of the master node which is not listed as alternative name
|
||||
self.pool_manager.connection_pool_kw['assert_hostname'] = False
|
||||
self._base_uri = value
|
||||
|
||||
@staticmethod
|
||||
def _handle_server_response(response, _preload_content):
|
||||
if response.status not in range(200, 206):
|
||||
@@ -180,26 +207,165 @@ class K8sClient(object):
|
||||
ret.update(headers or {})
|
||||
return ret
|
||||
|
||||
def request(self, method, path, timeout=None, **kwargs):
|
||||
retries = 0 if timeout else 1
|
||||
@property
|
||||
def api_servers_cache(self):
|
||||
base_uri, cache = self._base_uri, self._api_servers_cache
|
||||
return ([base_uri] if base_uri in cache else []) + [machine for machine in cache if machine != base_uri]
|
||||
|
||||
def _get_api_servers(self, api_servers_cache):
|
||||
_, per_node_timeout, per_node_retries = self._calculate_timeouts(len(api_servers_cache))
|
||||
kwargs = {'headers': self._make_headers({}), 'preload_content': True, 'retries': per_node_retries,
|
||||
'timeout': urllib3.Timeout(connect=max(1, per_node_timeout/2.0), total=per_node_timeout)}
|
||||
path = self._API_URL_PREFIX + 'default/endpoints/kubernetes'
|
||||
for base_uri in api_servers_cache:
|
||||
try:
|
||||
response = self.pool_manager.request('GET', base_uri + path, **kwargs)
|
||||
endpoint = self._handle_server_response(response, True)
|
||||
for subset in endpoint.subsets:
|
||||
for port in subset.ports:
|
||||
if port.name == 'https' and port.protocol == 'TCP':
|
||||
addresses = [uri('https', (a.ip, port.port)) for a in subset.addresses]
|
||||
if addresses:
|
||||
random.shuffle(addresses)
|
||||
return addresses
|
||||
except Exception as e:
|
||||
if isinstance(e, k8s_client.rest.ApiException) and e.status == 403:
|
||||
raise
|
||||
self.pool_manager.clear()
|
||||
logger.error('Failed to get "kubernetes" endpoint from %s: %r', base_uri, e)
|
||||
raise K8sConnectionFailed('No more K8s API server nodes in the cluster')
|
||||
|
||||
def _refresh_api_servers_cache(self, updating_cache=False):
|
||||
if self._bypass_api_service:
|
||||
try:
|
||||
api_servers_cache = [k8s_config.server] if updating_cache else self.api_servers_cache
|
||||
self._api_servers_cache = self._get_api_servers(api_servers_cache)
|
||||
if updating_cache:
|
||||
self.pool_manager.clear()
|
||||
except k8s_client.rest.ApiException: # 403 Permission denied
|
||||
logger.warning("Kubernetes RBAC doesn't allow GET access to the 'kubernetes' "
|
||||
"endpoint in the 'default' namespace. Disabling 'bypass_api_service'.")
|
||||
self._bypass_api_service = False
|
||||
self._api_servers_cache = [k8s_config.server]
|
||||
if not updating_cache:
|
||||
self.pool_manager.clear()
|
||||
except K8sConnectionFailed:
|
||||
if updating_cache:
|
||||
raise K8sException("Could not get the list of K8s API server nodes")
|
||||
return
|
||||
else:
|
||||
self._api_servers_cache = [k8s_config.server]
|
||||
|
||||
if self._base_uri not in self._api_servers_cache:
|
||||
self.set_base_uri(self._api_servers_cache[0])
|
||||
self._api_servers_cache_updated = time.time()
|
||||
|
||||
def refresh_api_servers_cache(self):
|
||||
if self._bypass_api_service and time.time() - self._api_servers_cache_updated > self._api_servers_cache_ttl:
|
||||
self._refresh_api_servers_cache()
|
||||
|
||||
def _load_api_servers_cache(self):
|
||||
self._update_api_servers_cache = True
|
||||
self._refresh_api_servers_cache(True)
|
||||
self._update_api_servers_cache = False
|
||||
|
||||
def _calculate_timeouts(self, api_servers, timeout=None):
|
||||
"""Calculate a request timeout and number of retries per single K8s API server node.
|
||||
In case if the timeout per node is too small (less than one second) we will reduce the number of nodes.
|
||||
For the cluster with only one API server node we will try to do 1 retry.
|
||||
No retries for clusters with 2 or more API server nodes. We better rely on switching to a different node."""
|
||||
|
||||
per_node_timeout = timeout = float(timeout or self._read_timeout)
|
||||
|
||||
max_retries = 3 - min(api_servers, 2)
|
||||
per_node_retries = 1
|
||||
min_timeout = 1.0
|
||||
|
||||
while api_servers > 0:
|
||||
per_node_timeout = float(timeout) / api_servers
|
||||
if per_node_timeout >= min_timeout:
|
||||
# for small clusters we will try to do more than one try on every node
|
||||
while per_node_retries < max_retries and per_node_timeout / (per_node_retries + 1) >= min_timeout:
|
||||
per_node_retries += 1
|
||||
per_node_timeout /= per_node_retries
|
||||
break
|
||||
# if the timeout per one node is to small try to reduce number of nodes
|
||||
api_servers -= 1
|
||||
max_retries = 1
|
||||
|
||||
return api_servers, per_node_timeout, per_node_retries - 1
|
||||
|
||||
def _do_http_request(self, retry, api_servers_cache, method, path, **kwargs):
|
||||
some_request_failed = False
|
||||
for i, base_uri in enumerate(api_servers_cache):
|
||||
if i > 0:
|
||||
logger.info('Retrying on %s', base_uri)
|
||||
try:
|
||||
response = self.pool_manager.request(method, base_uri + path, **kwargs)
|
||||
if some_request_failed:
|
||||
self.set_base_uri(base_uri)
|
||||
self._refresh_api_servers_cache()
|
||||
return response
|
||||
except (HTTPError, HTTPException, socket.error, socket.timeout) as e:
|
||||
self.pool_manager.clear()
|
||||
if not retry:
|
||||
# switch to the next node if request failed and retry is not allowed
|
||||
if i + 1 < len(api_servers_cache):
|
||||
self.set_base_uri(api_servers_cache[i + 1])
|
||||
raise K8sException('{0} {1} request failed'.format(method, path))
|
||||
logger.error('Request to server %s failed: %r', base_uri, e)
|
||||
some_request_failed = True
|
||||
|
||||
raise K8sConnectionFailed('No more API server nodes in the cluster')
|
||||
|
||||
def request(self, retry, method, path, timeout=None, **kwargs):
|
||||
if self._update_api_servers_cache:
|
||||
self._load_api_servers_cache()
|
||||
|
||||
api_servers_cache = self.api_servers_cache
|
||||
api_servers = len(api_servers_cache)
|
||||
|
||||
if timeout:
|
||||
if isinstance(timeout, six.integer_types + (float,)):
|
||||
timeout = urllib3.Timeout(total=timeout)
|
||||
elif isinstance(timeout, tuple) and len(timeout) == 2:
|
||||
timeout = urllib3.Timeout(connect=timeout[0], read=timeout[1])
|
||||
retries = 0
|
||||
else:
|
||||
timeout = self._read_timeout / 2.0
|
||||
_, timeout, retries = self._calculate_timeouts(api_servers)
|
||||
timeout = urllib3.Timeout(connect=max(1, timeout/2.0), total=timeout)
|
||||
kwargs.update(retries=retries, timeout=timeout)
|
||||
return self.pool_manager.request(method, k8s_config.server + path, **kwargs)
|
||||
|
||||
def call_api(self, method, path, headers=None, body=None,
|
||||
while True:
|
||||
try:
|
||||
return self._do_http_request(retry, api_servers_cache, method, path, **kwargs)
|
||||
except K8sConnectionFailed as ex:
|
||||
try:
|
||||
self._load_api_servers_cache()
|
||||
api_servers_cache = self.api_servers_cache
|
||||
api_servers = len(api_servers)
|
||||
except Exception as e:
|
||||
logger.debug('Failed to update list of K8s master nodes: %r', e)
|
||||
|
||||
sleeptime = retry.sleeptime
|
||||
remaining_time = retry.stoptime - sleeptime - time.time()
|
||||
nodes, timeout, retries = self._calculate_timeouts(api_servers, remaining_time)
|
||||
if nodes == 0:
|
||||
self._update_api_servers_cache = True
|
||||
raise ex
|
||||
retry.sleep_func(sleeptime)
|
||||
retry.update_delay()
|
||||
# We still have some time left. Partially reduce `api_servers_cache` and retry request
|
||||
kwargs.update(timeout=urllib3.Timeout(connect=max(1, timeout/2.0), total=timeout), retries=retries)
|
||||
api_servers_cache = api_servers_cache[:nodes]
|
||||
|
||||
def call_api(self, method, path, headers=None, body=None, _retry=None,
|
||||
_preload_content=True, _request_timeout=None, **kwargs):
|
||||
headers = self._make_headers(headers)
|
||||
fields = {to_camel_case(k): v for k, v in kwargs.items()} # resource_version => resourceVersion
|
||||
body = json.dumps(body, default=lambda o: o.to_dict()) if body is not None else None
|
||||
|
||||
response = self.request(method, self._API_URL_PREFIX + path, headers=headers, fields=fields,
|
||||
response = self.request(_retry, method, self._API_URL_PREFIX + path, headers=headers, fields=fields,
|
||||
body=body, preload_content=_preload_content, timeout=_request_timeout)
|
||||
|
||||
return self._handle_server_response(response, _preload_content)
|
||||
@@ -273,8 +439,8 @@ class KubernetesRetriableException(k8s_client.rest.ApiException):
|
||||
|
||||
class CoreV1ApiProxy(object):
|
||||
|
||||
def __init__(self, use_endpoints=False):
|
||||
self._api_client = k8s_client.ApiClient()
|
||||
def __init__(self, use_endpoints=False, bypass_api_service=False):
|
||||
self._api_client = k8s_client.ApiClient(bypass_api_service)
|
||||
self._core_v1_api = k8s_client.CoreV1Api(self._api_client)
|
||||
self._use_endpoints = bool(use_endpoints)
|
||||
|
||||
@@ -286,6 +452,10 @@ class CoreV1ApiProxy(object):
|
||||
self._api_client.pool_manager.connection_pool_kw['socket_options'] = \
|
||||
list(keepalive_socket_options(ttl, int(loop_wait + retry_timeout)))
|
||||
self._api_client.set_read_timeout(retry_timeout)
|
||||
self._api_client.set_api_servers_cache_ttl(loop_wait)
|
||||
|
||||
def refresh_api_servers_cache(self):
|
||||
self._api_client.refresh_api_servers_cache()
|
||||
|
||||
def __getattr__(self, func):
|
||||
if func.endswith('_kind'):
|
||||
@@ -315,7 +485,7 @@ def catch_kubernetes_errors(func):
|
||||
elif e.status != 409: # Object exists or conflict in resource_version
|
||||
logger.exception('Unexpected error from Kubernetes API')
|
||||
return False
|
||||
except (RetryFailedError, HTTPException, HTTPError, socket.error, socket.timeout):
|
||||
except (RetryFailedError, K8sException):
|
||||
return False
|
||||
return wrapper
|
||||
|
||||
@@ -338,7 +508,7 @@ class ObjectCache(Thread):
|
||||
|
||||
def _list(self):
|
||||
try:
|
||||
return self._func(_request_timeout=(self._retry.deadline, Timeout.DEFAULT_TIMEOUT))
|
||||
return self._func(_retry=self._retry.copy())
|
||||
except Exception:
|
||||
time.sleep(1)
|
||||
raise
|
||||
@@ -446,8 +616,7 @@ class Kubernetes(AbstractDCS):
|
||||
config['namespace'] = ''
|
||||
super(Kubernetes, self).__init__(config)
|
||||
self._retry = Retry(deadline=config['retry_timeout'], max_delay=1, max_tries=-1,
|
||||
retry_exceptions=(KubernetesRetriableException, HTTPException,
|
||||
HTTPError, socket.error, socket.timeout))
|
||||
retry_exceptions=KubernetesRetriableException)
|
||||
self._ttl = None
|
||||
try:
|
||||
k8s_config.load_incluster_config()
|
||||
@@ -462,7 +631,7 @@ class Kubernetes(AbstractDCS):
|
||||
port.update({n: p[n] for n in ('name', 'protocol') if p.get(n)})
|
||||
self.__ports.append(k8s_client.V1EndpointPort(**port))
|
||||
|
||||
self._api = CoreV1ApiProxy(config.get('use_endpoints'))
|
||||
self._api = CoreV1ApiProxy(config.get('use_endpoints'), config.get('bypass_api_service'))
|
||||
self._should_create_config_service = self._api.use_endpoints
|
||||
self.reload_config(config)
|
||||
# leader_observed_record, leader_resource_version, and leader_observed_time are used only for leader race!
|
||||
@@ -482,7 +651,9 @@ class Kubernetes(AbstractDCS):
|
||||
self._kinds = ObjectCache(self, kinds_func, self._retry, self._condition, self._name)
|
||||
|
||||
def retry(self, *args, **kwargs):
|
||||
return self._retry.copy()(*args, **kwargs)
|
||||
retry = self._retry.copy()
|
||||
kwargs['_retry'] = retry
|
||||
return retry(*args, **kwargs)
|
||||
|
||||
def client_path(self, path):
|
||||
return super(Kubernetes, self).client_path(path)[1:].replace('/', '-')
|
||||
@@ -514,8 +685,7 @@ class Kubernetes(AbstractDCS):
|
||||
member.data['pod_labels'] = pod.metadata.labels
|
||||
return member
|
||||
|
||||
def _wait_caches(self):
|
||||
stop_time = time.time() + self._retry.deadline
|
||||
def _wait_caches(self, stop_time):
|
||||
while not (self._pods.is_ready() and self._kinds.is_ready()):
|
||||
timeout = stop_time - time.time()
|
||||
if timeout <= 0:
|
||||
@@ -523,9 +693,11 @@ class Kubernetes(AbstractDCS):
|
||||
self._condition.wait(timeout)
|
||||
|
||||
def _load_cluster(self):
|
||||
stop_time = time.time() + self._retry.deadline
|
||||
self._api.refresh_api_servers_cache()
|
||||
try:
|
||||
with self._condition:
|
||||
self._wait_caches()
|
||||
self._wait_caches(stop_time)
|
||||
|
||||
members = [self.member(pod) for pod in self._pods.copy().values()]
|
||||
nodes = self._kinds.copy()
|
||||
@@ -718,6 +890,7 @@ class Kubernetes(AbstractDCS):
|
||||
retry = self._retry.copy()
|
||||
|
||||
def _retry(*args, **kwargs):
|
||||
kwargs['_retry'] = retry
|
||||
return retry(*args, **kwargs)
|
||||
|
||||
try:
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import json
|
||||
import socket
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from mock import Mock, mock_open, patch
|
||||
from patroni.dcs.kubernetes import Kubernetes, KubernetesError, K8sConfig, K8sObject, RetryFailedError,\
|
||||
k8s_client, k8s_config, SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME
|
||||
from patroni.dcs.kubernetes import k8s_client, k8s_config, K8sConfig, K8sConnectionFailed,\
|
||||
K8sException, K8sObject, Kubernetes, KubernetesError, KubernetesRetriableException,\
|
||||
Retry, RetryFailedError, SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME
|
||||
from six.moves import builtins
|
||||
from threading import Thread
|
||||
from . import MockResponse, SleepException
|
||||
@@ -93,12 +95,43 @@ class TestK8sConfig(unittest.TestCase):
|
||||
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer token')
|
||||
|
||||
|
||||
@patch('urllib3.PoolManager.request')
|
||||
class TestApiClient(unittest.TestCase):
|
||||
|
||||
@patch.object(K8sConfig, '_server', '', create=True)
|
||||
@patch('urllib3.PoolManager.request', Mock())
|
||||
def setUp(self):
|
||||
self.a = k8s_client.ApiClient(True)
|
||||
self.mock_get_ep = MockResponse()
|
||||
self.mock_get_ep.content = '{"subsets":[{"ports":[{"name":"https","protocol":"TCP","port":443}],' +\
|
||||
'"addresses":[{"ip":"127.0.0.1"},{"ip":"127.0.0.2"}]}]}'
|
||||
|
||||
def test__do_http_request(self, mock_request):
|
||||
mock_request.side_effect = [self.mock_get_ep] + [socket.timeout]
|
||||
self.assertRaises(K8sException, self.a.call_api, 'GET', 'f')
|
||||
|
||||
@patch('time.sleep', Mock())
|
||||
def test_request(self, mock_request):
|
||||
retry = Retry(deadline=10, max_delay=1, max_tries=1, retry_exceptions=KubernetesRetriableException)
|
||||
mock_request.side_effect = [self.mock_get_ep] + 3 * [socket.timeout] + [k8s_client.rest.ApiException(500, '')]
|
||||
self.assertRaises(k8s_client.rest.ApiException, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
||||
mock_request.side_effect = [self.mock_get_ep, socket.timeout, Mock(), self.mock_get_ep]
|
||||
self.assertRaises(k8s_client.rest.ApiException, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
||||
retry.deadline = 0.0001
|
||||
mock_request.side_effect = [socket.timeout, socket.timeout, self.mock_get_ep]
|
||||
self.assertRaises(K8sConnectionFailed, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
||||
|
||||
def test__refresh_api_servers_cache(self, mock_request):
|
||||
mock_request.side_effect = k8s_client.rest.ApiException(403, '')
|
||||
self.a.refresh_api_servers_cache()
|
||||
|
||||
|
||||
class TestCoreV1Api(unittest.TestCase):
|
||||
|
||||
@patch('urllib3.PoolManager.request', Mock())
|
||||
@patch.object(K8sConfig, '_server', '', create=True)
|
||||
def setUp(self):
|
||||
self.a = k8s_client.CoreV1Api()
|
||||
self.a._api_client.set_read_timeout(10)
|
||||
self.a._api_client.pool_manager.request = Mock(return_value=MockResponse())
|
||||
|
||||
def test_create_namespaced_service(self):
|
||||
@@ -126,6 +159,7 @@ class TestCoreV1Api(unittest.TestCase):
|
||||
|
||||
class BaseTestKubernetes(unittest.TestCase):
|
||||
|
||||
@patch('urllib3.PoolManager.request', Mock())
|
||||
@patch('socket.TCP_KEEPIDLE', 4, create=True)
|
||||
@patch('socket.TCP_KEEPINTVL', 5, create=True)
|
||||
@patch('socket.TCP_KEEPCNT', 6, create=True)
|
||||
@@ -136,7 +170,8 @@ class BaseTestKubernetes(unittest.TestCase):
|
||||
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
|
||||
def setUp(self, config=None):
|
||||
config = config or {}
|
||||
config.update(ttl=30, scope='test', name='p-0', loop_wait=10, retry_timeout=10, labels={'f': 'b'})
|
||||
config.update(ttl=30, scope='test', name='p-0', loop_wait=10,
|
||||
retry_timeout=10, labels={'f': 'b'}, bypass_api_service=True)
|
||||
self.k = Kubernetes(config)
|
||||
self.assertRaises(AttributeError, self.k._pods._build_cache)
|
||||
self.k._pods._is_ready = True
|
||||
@@ -152,7 +187,7 @@ class TestKubernetesConfigMaps(BaseTestKubernetes):
|
||||
def test__wait_caches(self):
|
||||
self.k._pods._is_ready = False
|
||||
with self.k._condition:
|
||||
self.assertRaises(RetryFailedError, self.k._wait_caches)
|
||||
self.assertRaises(RetryFailedError, self.k._wait_caches, time.time() + 10)
|
||||
|
||||
@patch('time.time', Mock(return_value=time.time() + 100))
|
||||
def test_get_cluster(self):
|
||||
|
||||
Reference in New Issue
Block a user