mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
High CPU load on Etcd nodes and K8s API servers created a very strange situation. A few clusters were running without a leader and the pod which is ahead of others was failing to take a leader lock because updates were failing with HTTP response code `409` (`resource_version` mismatch). Effectively that means that TCP connections to K8s master nodes were alive (in the opposite case tcp keepalives would have resolved it), but no `UPDATE` events were arriving via these connections, resulting in the stale cache of the cluster in memory. The only good way to prevent this situation is to intercept 409 HTTP responses and terminate existing TCP connections used for watches. Now a few words about implementation. Unfortunately, watch threads are waiting in the read() call most of the time and there is no good way to interrupt them. But, the `socket.shutdown()` seems to do this job. We already used this trick in the Etcd3 implementation. This approach will help to mitigate the issue of not having a leader, but at the same time replicas might still end up with the stale cluster state cached and in the worst case will not stream from the leader. Non-streaming replicas are less dangerous and could be covered by monitoring and partially mitigated by correctly configured `archive_command` and `restore_command`.
365 lines
18 KiB
Python
365 lines
18 KiB
Python
import datetime
|
|
import json
|
|
import socket
|
|
import time
|
|
import unittest
|
|
|
|
from mock import Mock, PropertyMock, mock_open, patch
|
|
from patroni.dcs.kubernetes import k8s_client, k8s_config, K8sConfig, K8sConnectionFailed,\
|
|
K8sException, K8sObject, Kubernetes, KubernetesError, KubernetesRetriableException,\
|
|
Retry, RetryFailedError, SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME
|
|
from six.moves import builtins
|
|
from threading import Thread
|
|
from . import MockResponse, SleepException
|
|
|
|
|
|
def mock_list_namespaced_config_map(*args, **kwargs):
|
|
metadata = {'resource_version': '1', 'labels': {'f': 'b'}, 'name': 'test-config',
|
|
'annotations': {'initialize': '123', 'config': '{}'}}
|
|
items = [k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))]
|
|
metadata.update({'name': 'test-leader',
|
|
'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s', 'slots': '{'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-failover', 'annotations': {'leader': 'p-0'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-sync', 'annotations': {'leader': 'p-0'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata = k8s_client.V1ObjectMeta(resource_version='1')
|
|
return k8s_client.V1ConfigMapList(metadata=metadata, items=items, kind='ConfigMapList')
|
|
|
|
|
|
def mock_read_namespaced_endpoints(*args, **kwargs):
|
|
target_ref = k8s_client.V1ObjectReference(kind='Pod', resource_version='10', name='p-0',
|
|
namespace='default', uid='964dfeae-e79b-4476-8a5a-1920b5c2a69d')
|
|
address0 = k8s_client.V1EndpointAddress(ip='10.0.0.0', target_ref=target_ref)
|
|
address1 = k8s_client.V1EndpointAddress(ip='10.0.0.1')
|
|
port = k8s_client.V1EndpointPort(port=5432, name='postgresql', protocol='TCP')
|
|
subset = k8s_client.V1EndpointSubset(addresses=[address1, address0], ports=[port])
|
|
metadata = k8s_client.V1ObjectMeta(resource_version='1', labels={'f': 'b'}, name='test',
|
|
annotations={'optime': '1234', 'leader': 'p-0', 'ttl': '30s'})
|
|
return k8s_client.V1Endpoints(subsets=[subset], metadata=metadata)
|
|
|
|
|
|
def mock_list_namespaced_endpoints(*args, **kwargs):
|
|
return k8s_client.V1EndpointsList(metadata=k8s_client.V1ObjectMeta(resource_version='1'),
|
|
items=[mock_read_namespaced_endpoints()], kind='V1EndpointsList')
|
|
|
|
|
|
def mock_list_namespaced_pod(*args, **kwargs):
|
|
metadata = k8s_client.V1ObjectMeta(resource_version='1', name='p-0', annotations={'status': '{}'},
|
|
uid='964dfeae-e79b-4476-8a5a-1920b5c2a69d')
|
|
status = k8s_client.V1PodStatus(pod_ip='10.0.0.0')
|
|
spec = k8s_client.V1PodSpec(hostname='p-0', node_name='kind-control-plane', containers=[])
|
|
items = [k8s_client.V1Pod(metadata=metadata, status=status, spec=spec)]
|
|
return k8s_client.V1PodList(items=items, kind='PodList')
|
|
|
|
|
|
def mock_namespaced_kind(*args, **kwargs):
|
|
mock = Mock()
|
|
mock.metadata.resource_version = '2'
|
|
return mock
|
|
|
|
|
|
def mock_load_k8s_config(self, *args, **kwargs):
|
|
self._server = ''
|
|
|
|
|
|
class TestK8sConfig(unittest.TestCase):
|
|
|
|
def test_load_incluster_config(self):
|
|
for env in ({}, {SERVICE_HOST_ENV_NAME: '', SERVICE_PORT_ENV_NAME: ''}):
|
|
with patch('os.environ', env):
|
|
self.assertRaises(k8s_config.ConfigException, k8s_config.load_incluster_config)
|
|
|
|
with patch('os.environ', {SERVICE_HOST_ENV_NAME: 'a', SERVICE_PORT_ENV_NAME: '1'}),\
|
|
patch('os.path.isfile', Mock(side_effect=[False, True, True, False, True, True, True, True])),\
|
|
patch.object(builtins, 'open', Mock(side_effect=[
|
|
mock_open()(), mock_open(read_data='a')(), mock_open(read_data='a')(),
|
|
mock_open()(), mock_open(read_data='a')(), mock_open(read_data='a')()])):
|
|
for _ in range(0, 4):
|
|
self.assertRaises(k8s_config.ConfigException, k8s_config.load_incluster_config)
|
|
k8s_config.load_incluster_config()
|
|
self.assertEqual(k8s_config.server, 'https://a:1')
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a')
|
|
|
|
def test_refresh_token(self):
|
|
with patch('os.environ', {SERVICE_HOST_ENV_NAME: 'a', SERVICE_PORT_ENV_NAME: '1'}),\
|
|
patch('os.path.isfile', Mock(side_effect=[True, True, False, True, True, True])),\
|
|
patch.object(builtins, 'open', Mock(side_effect=[
|
|
mock_open(read_data='cert')(), mock_open(read_data='a')(),
|
|
mock_open()(), mock_open(read_data='b')(), mock_open(read_data='c')()])):
|
|
k8s_config.load_incluster_config(token_refresh_interval=datetime.timedelta(milliseconds=100))
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a')
|
|
time.sleep(0.1)
|
|
# token file doesn't exist
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a')
|
|
# token file is empty
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a')
|
|
# token refreshed
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer b')
|
|
time.sleep(0.1)
|
|
# token refreshed
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer c')
|
|
# no need to refresh token
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer c')
|
|
|
|
def test_load_kube_config(self):
|
|
config = {
|
|
"current-context": "local",
|
|
"contexts": [{"name": "local", "context": {"user": "local", "cluster": "local"}}],
|
|
"clusters": [{"name": "local", "cluster": {"server": "https://a:1/", "certificate-authority": "a"}}],
|
|
"users": [{"name": "local", "user": {"username": "a", "password": "b", "client-certificate": "c"}}]
|
|
}
|
|
with patch.object(builtins, 'open', mock_open(read_data=json.dumps(config))):
|
|
k8s_config.load_kube_config()
|
|
self.assertEqual(k8s_config.server, 'https://a:1')
|
|
self.assertEqual(k8s_config.pool_config, {'ca_certs': 'a', 'cert_file': 'c', 'cert_reqs': 'CERT_REQUIRED',
|
|
'maxsize': 10, 'num_pools': 10})
|
|
|
|
config["users"][0]["user"]["token"] = "token"
|
|
with patch.object(builtins, 'open', mock_open(read_data=json.dumps(config))):
|
|
k8s_config.load_kube_config()
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer token')
|
|
|
|
|
|
@patch('urllib3.PoolManager.request')
|
|
class TestApiClient(unittest.TestCase):
|
|
|
|
@patch.object(K8sConfig, '_server', '', create=True)
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
def setUp(self):
|
|
self.a = k8s_client.ApiClient(True)
|
|
self.mock_get_ep = MockResponse()
|
|
self.mock_get_ep.content = '{"subsets":[{"ports":[{"name":"https","protocol":"TCP","port":443}],' +\
|
|
'"addresses":[{"ip":"127.0.0.1"},{"ip":"127.0.0.2"}]}]}'
|
|
|
|
def test__do_http_request(self, mock_request):
|
|
mock_request.side_effect = [self.mock_get_ep] + [socket.timeout]
|
|
self.assertRaises(K8sException, self.a.call_api, 'GET', 'f')
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test_request(self, mock_request):
|
|
retry = Retry(deadline=10, max_delay=1, max_tries=1, retry_exceptions=KubernetesRetriableException)
|
|
mock_request.side_effect = [self.mock_get_ep] + 3 * [socket.timeout] + [k8s_client.rest.ApiException(500, '')]
|
|
self.assertRaises(k8s_client.rest.ApiException, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
|
mock_request.side_effect = [self.mock_get_ep, socket.timeout, Mock(), self.mock_get_ep]
|
|
self.assertRaises(k8s_client.rest.ApiException, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
|
retry.deadline = 0.0001
|
|
mock_request.side_effect = [socket.timeout, socket.timeout, self.mock_get_ep]
|
|
self.assertRaises(K8sConnectionFailed, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
|
|
|
def test__refresh_api_servers_cache(self, mock_request):
|
|
mock_request.side_effect = k8s_client.rest.ApiException(403, '')
|
|
self.a.refresh_api_servers_cache()
|
|
|
|
|
|
class TestCoreV1Api(unittest.TestCase):
|
|
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
@patch.object(K8sConfig, '_server', '', create=True)
|
|
def setUp(self):
|
|
self.a = k8s_client.CoreV1Api()
|
|
self.a._api_client.pool_manager.request = Mock(return_value=MockResponse())
|
|
|
|
def test_create_namespaced_service(self):
|
|
self.assertEqual(str(self.a.create_namespaced_service('default', {}, _request_timeout=2)), '{}')
|
|
|
|
def test_list_namespaced_endpoints(self):
|
|
self.a._api_client.pool_manager.request.return_value.content = '{"items": [1,2,3]}'
|
|
self.assertIsInstance(self.a.list_namespaced_endpoints('default'), K8sObject)
|
|
|
|
def test_patch_namespaced_config_map(self):
|
|
self.assertEqual(str(self.a.patch_namespaced_config_map('foo', 'default', {}, _request_timeout=(1, 2))), '{}')
|
|
|
|
def test_list_namespaced_pod(self):
|
|
self.a._api_client.pool_manager.request.return_value.status_code = 409
|
|
self.a._api_client.pool_manager.request.return_value.content = 'foo'
|
|
try:
|
|
self.a.list_namespaced_pod('default', label_selector='foo=bar')
|
|
self.assertFail()
|
|
except k8s_client.rest.ApiException as e:
|
|
self.assertTrue('Reason: ' in str(e))
|
|
|
|
def test_delete_namespaced_pod(self):
|
|
self.assertEqual(str(self.a.delete_namespaced_pod('foo', 'default', _request_timeout=(1, 2), body={})), '{}')
|
|
|
|
|
|
class BaseTestKubernetes(unittest.TestCase):
|
|
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
@patch('socket.TCP_KEEPIDLE', 4, create=True)
|
|
@patch('socket.TCP_KEEPINTVL', 5, create=True)
|
|
@patch('socket.TCP_KEEPCNT', 6, create=True)
|
|
@patch.object(Thread, 'start', Mock())
|
|
@patch.object(K8sConfig, 'load_kube_config', mock_load_k8s_config)
|
|
@patch.object(K8sConfig, 'load_incluster_config', Mock(side_effect=k8s_config.ConfigException))
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod, create=True)
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
|
|
def setUp(self, config=None):
|
|
config = config or {}
|
|
config.update(ttl=30, scope='test', name='p-0', loop_wait=10,
|
|
retry_timeout=10, labels={'f': 'b'}, bypass_api_service=True)
|
|
self.k = Kubernetes(config)
|
|
self.assertRaises(AttributeError, self.k._pods._build_cache)
|
|
self.k._pods._is_ready = True
|
|
self.assertRaises(TypeError, self.k._kinds._build_cache)
|
|
self.k._kinds._is_ready = True
|
|
self.k.get_cluster()
|
|
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', mock_namespaced_kind, create=True)
|
|
class TestKubernetesConfigMaps(BaseTestKubernetes):
|
|
|
|
@patch('time.time', Mock(side_effect=[1, 10.9, 100]))
|
|
def test__wait_caches(self):
|
|
self.k._pods._is_ready = False
|
|
with self.k._condition:
|
|
self.assertRaises(RetryFailedError, self.k._wait_caches, time.time() + 10)
|
|
|
|
@patch('time.time', Mock(return_value=time.time() + 100))
|
|
def test_get_cluster(self):
|
|
self.k.get_cluster()
|
|
|
|
with patch.object(Kubernetes, '_wait_caches', Mock(side_effect=Exception)):
|
|
self.assertRaises(KubernetesError, self.k.get_cluster)
|
|
|
|
def test_take_leader(self):
|
|
self.k.take_leader()
|
|
self.k._leader_observed_record['leader'] = 'test'
|
|
self.k.patch_or_create = Mock(return_value=False)
|
|
self.k.take_leader()
|
|
|
|
def test_manual_failover(self):
|
|
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map',
|
|
Mock(side_effect=RetryFailedError('')), create=True):
|
|
self.k.manual_failover('foo', 'bar')
|
|
|
|
def test_set_config_value(self):
|
|
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map',
|
|
Mock(side_effect=k8s_client.rest.ApiException(409, '')), create=True):
|
|
self.k.set_config_value('{}', 1)
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', create=True)
|
|
def test_touch_member(self, mock_patch_namespaced_pod):
|
|
mock_patch_namespaced_pod.return_value.metadata.resource_version = '10'
|
|
self.k.touch_member({'role': 'replica'})
|
|
self.k._name = 'p-1'
|
|
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
|
self.k.touch_member({'state': 'stopped', 'role': 'master'})
|
|
|
|
def test_initialize(self):
|
|
self.k.initialize()
|
|
|
|
def test_delete_leader(self):
|
|
self.k.delete_leader(1)
|
|
|
|
def test_cancel_initialization(self):
|
|
self.k.cancel_initialization()
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'delete_collection_namespaced_config_map',
|
|
Mock(side_effect=k8s_client.rest.ApiException(403, '')), create=True)
|
|
def test_delete_cluster(self):
|
|
self.k.delete_cluster()
|
|
|
|
def test_watch(self):
|
|
self.k.set_ttl(10)
|
|
self.k.watch(None, 0)
|
|
self.k.watch(None, 0)
|
|
|
|
def test_set_history_value(self):
|
|
self.k.set_history_value('{}')
|
|
|
|
|
|
class TestKubernetesEndpoints(BaseTestKubernetes):
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True)
|
|
def setUp(self, config=None):
|
|
super(TestKubernetesEndpoints, self).setUp({'use_endpoints': True, 'pod_ip': '10.0.0.0'})
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
|
|
def test_update_leader(self, mock_patch_namespaced_endpoints):
|
|
self.assertIsNotNone(self.k.update_leader('123'))
|
|
args = mock_patch_namespaced_endpoints.call_args[0]
|
|
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '10')
|
|
self.k._kinds._object_cache['test'].subsets[:] = []
|
|
self.assertIsNotNone(self.k.update_leader('123'))
|
|
self.k._kinds._object_cache['test'].metadata.annotations['leader'] = 'p-1'
|
|
self.assertFalse(self.k.update_leader('123'))
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'read_namespaced_endpoints', create=True)
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
|
|
def test__update_leader_with_retry(self, mock_patch, mock_read):
|
|
mock_read.return_value = mock_read_namespaced_endpoints()
|
|
mock_patch.side_effect = k8s_client.rest.ApiException(502, '')
|
|
self.assertFalse(self.k.update_leader('123'))
|
|
mock_patch.side_effect = RetryFailedError('')
|
|
self.assertFalse(self.k.update_leader('123'))
|
|
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
|
|
with patch('time.time', Mock(side_effect=[0, 100, 200, 0, 0, 0, 0, 100, 200])):
|
|
self.assertFalse(self.k.update_leader('123'))
|
|
self.assertFalse(self.k.update_leader('123'))
|
|
self.assertFalse(self.k.update_leader('123'))
|
|
mock_patch.side_effect = [k8s_client.rest.ApiException(409, ''), mock_namespaced_kind()]
|
|
mock_read.return_value.metadata.resource_version = '2'
|
|
self.assertIsNotNone(self.k._update_leader_with_retry({}, '1', []))
|
|
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
|
|
mock_read.side_effect = Exception
|
|
self.assertFalse(self.k.update_leader('123'))
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints',
|
|
Mock(side_effect=[k8s_client.rest.ApiException(500, ''),
|
|
k8s_client.rest.ApiException(502, '')]), create=True)
|
|
def test_delete_sync_state(self):
|
|
self.assertFalse(self.k.delete_sync_state())
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', mock_namespaced_kind, create=True)
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', mock_namespaced_kind, create=True)
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_service',
|
|
Mock(side_effect=[True, False, k8s_client.rest.ApiException(500, '')]), create=True)
|
|
def test__create_config_service(self):
|
|
self.assertIsNotNone(self.k.patch_or_create_config({'foo': 'bar'}))
|
|
self.assertIsNotNone(self.k.patch_or_create_config({'foo': 'bar'}))
|
|
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
|
|
|
|
|
class TestCacheBuilder(BaseTestKubernetes):
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
|
|
@patch('patroni.dcs.kubernetes.ObjectCache._watch')
|
|
def test__build_cache(self, mock_response):
|
|
mock_response.return_value.read_chunked.return_value = [json.dumps(
|
|
{'type': 'MODIFIED', 'object': {'metadata': {
|
|
'name': self.k.config_path, 'resourceVersion': '2', 'annotations': {self.k._CONFIG: 'foo'}}}}
|
|
).encode('utf-8'), ('\n' + json.dumps(
|
|
{'type': 'DELETED', 'object': {'metadata': {
|
|
'name': self.k.config_path, 'resourceVersion': '3'}}}
|
|
) + '\n' + json.dumps(
|
|
{'type': 'MDIFIED', 'object': {'metadata': {'name': self.k.config_path}}}
|
|
) + '\n').encode('utf-8'), b'{"object":{', b'"code":410}}\n']
|
|
self.k._kinds._build_cache()
|
|
|
|
@patch('patroni.dcs.kubernetes.logger.error', Mock(side_effect=SleepException))
|
|
@patch('patroni.dcs.kubernetes.ObjectCache._build_cache', Mock(side_effect=Exception))
|
|
def test_run(self):
|
|
self.assertRaises(SleepException, self.k._pods.run)
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test__list(self):
|
|
self.k._pods._func = Mock(side_effect=Exception)
|
|
self.assertRaises(Exception, self.k._pods._list)
|
|
|
|
@patch('patroni.dcs.kubernetes.ObjectCache._watch', Mock(return_value=None))
|
|
def test__do_watch(self):
|
|
self.assertRaises(AttributeError, self.k._kinds._do_watch, '1')
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
|
|
@patch('patroni.dcs.kubernetes.ObjectCache._watch')
|
|
def test_kill_stream(self, mock_watch):
|
|
self.k._kinds.kill_stream()
|
|
mock_watch.return_value.read_chunked.return_value = []
|
|
mock_watch.return_value.connection.sock.close.side_effect = Exception
|
|
self.k._kinds._do_watch('1')
|
|
self.k._kinds.kill_stream()
|
|
type(mock_watch.return_value).connection = PropertyMock(side_effect=Exception)
|
|
self.k._kinds.kill_stream()
|