diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 335e284b..b04f5717 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -29,7 +29,7 @@ class KubernetesRetriableException(k8s_client.rest.ApiException): self.headers = orig.headers -class CoreV1Api(object): +class CoreV1ApiProxy(object): def __init__(self, use_endpoints=False): self._api = k8s_client.CoreV1Api() @@ -55,6 +55,21 @@ class CoreV1Api(object): return wrapper +def catch_kubernetes_errors(func): + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except k8s_client.rest.ApiException as e: + if e.status == 403: + logger.exception('Permission denied') + elif e.status != 409: # Object exists or conflict in resource_version + logger.exception('Unexpected error from Kubernetes API') + return False + except (RetryFailedError, HTTPException, HTTPError, socket.error, socket.timeout): + return False + return wrapper + + class Kubernetes(AbstractDCS): def __init__(self, config): @@ -84,7 +99,7 @@ class Kubernetes(AbstractDCS): port.update({n: p[n] for n in ('name', 'protocol') if p.get(n)}) ports.append(k8s_client.V1EndpointPort(**port)) self.__subsets = [k8s_client.V1EndpointSubset(addresses=addresses, ports=ports)] - self._api = CoreV1Api(use_endpoints) + self._api = CoreV1ApiProxy(use_endpoints) self.set_retry_timeout(config['retry_timeout']) self.set_ttl(config.get('ttl') or 30) self._leader_observed_record = {} @@ -96,16 +111,6 @@ class Kubernetes(AbstractDCS): def retry(self, *args, **kwargs): return self._retry.copy()(*args, **kwargs) - def catch_kubernetes_errors(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except (RetryFailedError, k8s_client.rest.ApiException, - HTTPException, HTTPError, socket.error, socket.timeout): - return False - return wrapper - def client_path(self, path): return super(Kubernetes, self).client_path(path)[1:].replace('/', '-') @@ -362,7 +367,6 @@ class Kubernetes(AbstractDCS): def delete_cluster(self): self.retry(self._api.delete_collection_namespaced_kind, self._namespace, label_selector=self._label_selector) - @catch_kubernetes_errors def set_history_value(self, value): patch = bool(self.cluster and self.cluster.config and self.cluster.config.index) return self.patch_or_create(self.config_path, {self._HISTORY: value}, None, patch, False) diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 41b8e82b..afb8019b 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -1,7 +1,7 @@ import unittest from mock import Mock, patch -from patroni.dcs.kubernetes import Kubernetes, KubernetesError, k8s_client, k8s_watch +from patroni.dcs.kubernetes import Kubernetes, KubernetesError, k8s_client, k8s_watch, RetryFailedError def mock_list_namespaced_config_map(self, *args, **kwargs): @@ -57,7 +57,8 @@ class TestKubernetes(unittest.TestCase): self.k.take_leader() def test_manual_failover(self): - self.k.manual_failover('foo', 'bar') + with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', Mock(side_effect=RetryFailedError(''))): + self.k.manual_failover('foo', 'bar') def test_set_config_value(self): self.k.set_config_value('{}') @@ -79,7 +80,7 @@ class TestKubernetes(unittest.TestCase): self.k.cancel_initialization() @patch.object(k8s_client.CoreV1Api, 'delete_collection_namespaced_config_map', - Mock(side_effect=k8s_client.rest.ApiException(500, ''))) + Mock(side_effect=k8s_client.rest.ApiException(403, ''))) def test_delete_cluster(self): self.k.delete_cluster()