Don't swallow silently all errors from k8s API (#611)

Output exception trace to the logs when http status code == 403, something is wrong with permissions.

When http status code == 409 -- such error could be ignored, because object probably was created or updated by another process.

For all other http status codes it will also produce stack traces.

I hope it will help to debug issues similar to the https://github.com/zalando/patroni/issues/606
This commit is contained in:
Alexander Kukushkin
2018-01-26 09:57:17 +01:00
committed by GitHub
parent f8b6b21297
commit a0c8491abb
2 changed files with 21 additions and 16 deletions

View File

@@ -29,7 +29,7 @@ class KubernetesRetriableException(k8s_client.rest.ApiException):
self.headers = orig.headers
class CoreV1Api(object):
class CoreV1ApiProxy(object):
def __init__(self, use_endpoints=False):
self._api = k8s_client.CoreV1Api()
@@ -55,6 +55,21 @@ class CoreV1Api(object):
return wrapper
def catch_kubernetes_errors(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except k8s_client.rest.ApiException as e:
if e.status == 403:
logger.exception('Permission denied')
elif e.status != 409: # Object exists or conflict in resource_version
logger.exception('Unexpected error from Kubernetes API')
return False
except (RetryFailedError, HTTPException, HTTPError, socket.error, socket.timeout):
return False
return wrapper
class Kubernetes(AbstractDCS):
def __init__(self, config):
@@ -84,7 +99,7 @@ class Kubernetes(AbstractDCS):
port.update({n: p[n] for n in ('name', 'protocol') if p.get(n)})
ports.append(k8s_client.V1EndpointPort(**port))
self.__subsets = [k8s_client.V1EndpointSubset(addresses=addresses, ports=ports)]
self._api = CoreV1Api(use_endpoints)
self._api = CoreV1ApiProxy(use_endpoints)
self.set_retry_timeout(config['retry_timeout'])
self.set_ttl(config.get('ttl') or 30)
self._leader_observed_record = {}
@@ -96,16 +111,6 @@ class Kubernetes(AbstractDCS):
def retry(self, *args, **kwargs):
return self._retry.copy()(*args, **kwargs)
def catch_kubernetes_errors(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except (RetryFailedError, k8s_client.rest.ApiException,
HTTPException, HTTPError, socket.error, socket.timeout):
return False
return wrapper
def client_path(self, path):
return super(Kubernetes, self).client_path(path)[1:].replace('/', '-')
@@ -362,7 +367,6 @@ class Kubernetes(AbstractDCS):
def delete_cluster(self):
self.retry(self._api.delete_collection_namespaced_kind, self._namespace, label_selector=self._label_selector)
@catch_kubernetes_errors
def set_history_value(self, value):
patch = bool(self.cluster and self.cluster.config and self.cluster.config.index)
return self.patch_or_create(self.config_path, {self._HISTORY: value}, None, patch, False)

View File

@@ -1,7 +1,7 @@
import unittest
from mock import Mock, patch
from patroni.dcs.kubernetes import Kubernetes, KubernetesError, k8s_client, k8s_watch
from patroni.dcs.kubernetes import Kubernetes, KubernetesError, k8s_client, k8s_watch, RetryFailedError
def mock_list_namespaced_config_map(self, *args, **kwargs):
@@ -57,7 +57,8 @@ class TestKubernetes(unittest.TestCase):
self.k.take_leader()
def test_manual_failover(self):
self.k.manual_failover('foo', 'bar')
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', Mock(side_effect=RetryFailedError(''))):
self.k.manual_failover('foo', 'bar')
def test_set_config_value(self):
self.k.set_config_value('{}')
@@ -79,7 +80,7 @@ class TestKubernetes(unittest.TestCase):
self.k.cancel_initialization()
@patch.object(k8s_client.CoreV1Api, 'delete_collection_namespaced_config_map',
Mock(side_effect=k8s_client.rest.ApiException(500, '')))
Mock(side_effect=k8s_client.rest.ApiException(403, '')))
def test_delete_cluster(self):
self.k.delete_cluster()