Configure keepalive for connections to K8s API (#1366)

In case if we got nothing from the socket after the TTL seconds it should be considered dead.
This commit is contained in:
Alexander Kukushkin
2020-01-27 09:25:08 +01:00
committed by GitHub
parent 902411239f
commit 6aa3f809d4
2 changed files with 29 additions and 11 deletions

View File

@@ -41,8 +41,22 @@ class CoreV1ApiProxy(object):
self._request_timeout = None
self._use_endpoints = use_endpoints
def set_timeout(self, timeout):
self._request_timeout = (1, timeout / 3.0)
def configure_timeouts(self, loop_wait, retry_timeout, ttl):
# Normally every loop_wait seconds we should have receive something from the socket.
# If we didn't received anything after the loop_wait + retry_timeout it is a time
# to start worrying (send keepalive messages). Finally, the connection should be
# considered as dead if we received nothing from the socket after the ttl seconds.
cnt = 3
idle = int(loop_wait + retry_timeout)
intvl = max(1, int(float(ttl - idle) / cnt))
self._api.api_client.rest_client.pool_manager.connection_pool_kw['socket_options'] = [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle),
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, intvl),
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, cnt),
(socket.IPPROTO_TCP, 18, int(ttl * 1000)) # TCP_USER_TIMEOUT
]
self._request_timeout = (1, retry_timeout / 3.0)
def __getattr__(self, func):
if func.endswith('_kind'):
@@ -210,8 +224,7 @@ class Kubernetes(AbstractDCS):
self.__subsets = [k8s_client.V1EndpointSubset(addresses=addresses, ports=ports)]
self._should_create_config_service = True
self._api = CoreV1ApiProxy(use_endpoints)
self.set_retry_timeout(config['retry_timeout'])
self.set_ttl(config.get('ttl') or 30)
self.reload_config(config)
self._leader_observed_record = {}
self._leader_observed_time = None
self._leader_resource_version = None
@@ -250,7 +263,10 @@ class Kubernetes(AbstractDCS):
def set_retry_timeout(self, retry_timeout):
self._retry.deadline = retry_timeout
self._api.set_timeout(retry_timeout)
def reload_config(self, config):
super(Kubernetes, self).reload_config(config)
self._api.configure_timeouts(self.loop_wait, self._retry.deadline, self.ttl)
@staticmethod
def member(pod):

View File

@@ -46,7 +46,8 @@ class TestKubernetes(unittest.TestCase):
@patch('kubernetes.client.api_client.ThreadPool', Mock(), create=True)
@patch.object(Thread, 'start', Mock())
def setUp(self):
self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, 'labels': {'f': 'b'}})
self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0',
'loop_wait': 10, 'retry_timeout': 10, 'labels': {'f': 'b'}})
self.assertRaises(AttributeError, self.k._pods._build_cache)
self.k._pods._is_ready = True
self.assertRaises(AttributeError, self.k._kinds._build_cache)
@@ -71,14 +72,14 @@ class TestKubernetes(unittest.TestCase):
@patch('kubernetes.config.load_kube_config', Mock())
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
def test_update_leader(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertIsNotNone(k.update_leader('123'))
@patch('kubernetes.config.load_kube_config', Mock())
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
def test_update_leader_with_restricted_access(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertIsNotNone(k.update_leader('123', True))
@@ -120,7 +121,7 @@ class TestKubernetes(unittest.TestCase):
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints',
Mock(side_effect=[k8s_client.rest.ApiException(502, ''), k8s_client.rest.ApiException(500, '')]))
def test_delete_sync_state(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertFalse(k.delete_sync_state())
@@ -139,7 +140,7 @@ class TestKubernetes(unittest.TestCase):
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_service',
Mock(side_effect=[True, False, k8s_client.rest.ApiException(500, '')]))
def test__create_config_service(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertIsNotNone(k.patch_or_create_config({'foo': 'bar'}))
self.assertIsNotNone(k.patch_or_create_config({'foo': 'bar'}))
@@ -152,7 +153,8 @@ class TestCacheBuilder(unittest.TestCase):
@patch('kubernetes.client.api_client.ThreadPool', Mock(), create=True)
@patch.object(Thread, 'start', Mock())
def setUp(self):
self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, 'labels': {'f': 'b'}})
self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0',
'loop_wait': 10, 'retry_timeout': 10, 'labels': {'f': 'b'}})
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map)
@patch('patroni.dcs.kubernetes.ObjectCache._watch')