From 6aa3f809d4d8915ff3265a71b2fd4e898db65400 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Mon, 27 Jan 2020 09:25:08 +0100 Subject: [PATCH] Configure keepalive for connections to K8s API (#1366) In case if we got nothing from the socket after the TTL seconds it should be considered dead. --- patroni/dcs/kubernetes.py | 26 +++++++++++++++++++++----- tests/test_kubernetes.py | 14 ++++++++------ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 1853105a..d84f02c4 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -41,8 +41,22 @@ class CoreV1ApiProxy(object): self._request_timeout = None self._use_endpoints = use_endpoints - def set_timeout(self, timeout): - self._request_timeout = (1, timeout / 3.0) + def configure_timeouts(self, loop_wait, retry_timeout, ttl): + # Normally every loop_wait seconds we should have receive something from the socket. + # If we didn't received anything after the loop_wait + retry_timeout it is a time + # to start worrying (send keepalive messages). Finally, the connection should be + # considered as dead if we received nothing from the socket after the ttl seconds. + cnt = 3 + idle = int(loop_wait + retry_timeout) + intvl = max(1, int(float(ttl - idle) / cnt)) + self._api.api_client.rest_client.pool_manager.connection_pool_kw['socket_options'] = [ + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle), + (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, intvl), + (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, cnt), + (socket.IPPROTO_TCP, 18, int(ttl * 1000)) # TCP_USER_TIMEOUT + ] + self._request_timeout = (1, retry_timeout / 3.0) def __getattr__(self, func): if func.endswith('_kind'): @@ -210,8 +224,7 @@ class Kubernetes(AbstractDCS): self.__subsets = [k8s_client.V1EndpointSubset(addresses=addresses, ports=ports)] self._should_create_config_service = True self._api = CoreV1ApiProxy(use_endpoints) - self.set_retry_timeout(config['retry_timeout']) - self.set_ttl(config.get('ttl') or 30) + self.reload_config(config) self._leader_observed_record = {} self._leader_observed_time = None self._leader_resource_version = None @@ -250,7 +263,10 @@ class Kubernetes(AbstractDCS): def set_retry_timeout(self, retry_timeout): self._retry.deadline = retry_timeout - self._api.set_timeout(retry_timeout) + + def reload_config(self, config): + super(Kubernetes, self).reload_config(config) + self._api.configure_timeouts(self.loop_wait, self._retry.deadline, self.ttl) @staticmethod def member(pod): diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 39f4eab1..03b7df40 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -46,7 +46,8 @@ class TestKubernetes(unittest.TestCase): @patch('kubernetes.client.api_client.ThreadPool', Mock(), create=True) @patch.object(Thread, 'start', Mock()) def setUp(self): - self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, 'labels': {'f': 'b'}}) + self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', + 'loop_wait': 10, 'retry_timeout': 10, 'labels': {'f': 'b'}}) self.assertRaises(AttributeError, self.k._pods._build_cache) self.k._pods._is_ready = True self.assertRaises(AttributeError, self.k._kinds._build_cache) @@ -71,14 +72,14 @@ class TestKubernetes(unittest.TestCase): @patch('kubernetes.config.load_kube_config', Mock()) @patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock()) def test_update_leader(self): - k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, + k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10, 'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'}) self.assertIsNotNone(k.update_leader('123')) @patch('kubernetes.config.load_kube_config', Mock()) @patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock()) def test_update_leader_with_restricted_access(self): - k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, + k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10, 'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'}) self.assertIsNotNone(k.update_leader('123', True)) @@ -120,7 +121,7 @@ class TestKubernetes(unittest.TestCase): @patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock(side_effect=[k8s_client.rest.ApiException(502, ''), k8s_client.rest.ApiException(500, '')])) def test_delete_sync_state(self): - k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, + k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10, 'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'}) self.assertFalse(k.delete_sync_state()) @@ -139,7 +140,7 @@ class TestKubernetes(unittest.TestCase): @patch.object(k8s_client.CoreV1Api, 'create_namespaced_service', Mock(side_effect=[True, False, k8s_client.rest.ApiException(500, '')])) def test__create_config_service(self): - k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, + k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10, 'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'}) self.assertIsNotNone(k.patch_or_create_config({'foo': 'bar'})) self.assertIsNotNone(k.patch_or_create_config({'foo': 'bar'})) @@ -152,7 +153,8 @@ class TestCacheBuilder(unittest.TestCase): @patch('kubernetes.client.api_client.ThreadPool', Mock(), create=True) @patch.object(Thread, 'start', Mock()) def setUp(self): - self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, 'labels': {'f': 'b'}}) + self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', + 'loop_wait': 10, 'retry_timeout': 10, 'labels': {'f': 'b'}}) @patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map) @patch('patroni.dcs.kubernetes.ObjectCache._watch')