mirror of
https://github.com/optim-enterprises-bv/patroni.git
synced 2026-01-07 16:11:50 +00:00
554 lines
30 KiB
Python
554 lines
30 KiB
Python
import base64
|
|
import datetime
|
|
import json
|
|
import socket
|
|
import time
|
|
import unittest
|
|
|
|
from threading import Thread
|
|
from unittest import mock
|
|
from unittest.mock import Mock, mock_open, patch, PropertyMock
|
|
|
|
import urllib3
|
|
|
|
from patroni.dcs import get_dcs
|
|
from patroni.dcs.kubernetes import Cluster, k8s_client, k8s_config, K8sConfig, K8sConnectionFailed, \
|
|
K8sException, K8sObject, Kubernetes, KubernetesError, KubernetesRetriableException, Retry, \
|
|
RetryFailedError, SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME
|
|
from patroni.postgresql.mpp import get_mpp
|
|
|
|
from . import MockResponse, SleepException
|
|
|
|
|
|
def mock_list_namespaced_config_map(*args, **kwargs):
|
|
k8s_group_label = get_mpp({'citus': {'group': 0, 'database': 'postgres'}}).k8s_group_label
|
|
metadata = {'resource_version': '1', 'labels': {'f': 'b'}, 'name': 'test-config',
|
|
'annotations': {'initialize': '123', 'config': '{}'}}
|
|
items = [k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))]
|
|
metadata.update({'name': 'test-leader',
|
|
'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s',
|
|
'slots': '{', 'retain_slots': '{', 'failsafe': '{'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-failover', 'annotations': {'leader': 'p-0'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-sync', 'annotations': {'leader': 'p-0'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-0-leader', 'labels': {k8s_group_label: '0'},
|
|
'annotations': {'optime': '1234x', 'leader': 'p-0', 'ttl': '30s',
|
|
'slots': '{', 'retain_slots': '{', 'failsafe': '{'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-0-config', 'labels': {k8s_group_label: '0'},
|
|
'annotations': {'initialize': '123', 'config': '{}'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-1-leader', 'labels': {k8s_group_label: '1'},
|
|
'annotations': {'leader': 'p-3', 'ttl': '30s'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-2-config', 'labels': {k8s_group_label: '2'}, 'annotations': {}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
|
|
metadata = k8s_client.V1ObjectMeta(resource_version='1')
|
|
return k8s_client.V1ConfigMapList(metadata=metadata, items=items, kind='ConfigMapList')
|
|
|
|
|
|
def mock_read_namespaced_endpoints(*args, **kwargs):
|
|
target_ref = k8s_client.V1ObjectReference(kind='Pod', resource_version='10', name='p-0',
|
|
namespace='default', uid='964dfeae-e79b-4476-8a5a-1920b5c2a69d')
|
|
address0 = k8s_client.V1EndpointAddress(ip='10.0.0.0', target_ref=target_ref)
|
|
address1 = k8s_client.V1EndpointAddress(ip='10.0.0.1')
|
|
port = k8s_client.V1EndpointPort(port=5432, name='postgresql', protocol='TCP')
|
|
subset = k8s_client.V1EndpointSubset(addresses=[address1, address0], ports=[port])
|
|
metadata = k8s_client.V1ObjectMeta(resource_version='1', labels={'f': 'b'}, name='test',
|
|
annotations={'optime': '1234', 'leader': 'p-0', 'ttl': '30s'})
|
|
return k8s_client.V1Endpoints(subsets=[subset], metadata=metadata)
|
|
|
|
|
|
def mock_list_namespaced_endpoints(*args, **kwargs):
|
|
return k8s_client.V1EndpointsList(metadata=k8s_client.V1ObjectMeta(resource_version='1'),
|
|
items=[mock_read_namespaced_endpoints()], kind='V1EndpointsList')
|
|
|
|
|
|
def mock_list_namespaced_pod(*args, **kwargs):
|
|
k8s_group_label = get_mpp({'citus': {'group': 0, 'database': 'postgres'}}).k8s_group_label
|
|
metadata = k8s_client.V1ObjectMeta(resource_version='1', labels={'f': 'b', k8s_group_label: '1'},
|
|
name='p-0', annotations={'status': '{}'},
|
|
uid='964dfeae-e79b-4476-8a5a-1920b5c2a69d')
|
|
status = k8s_client.V1PodStatus(pod_ip='10.0.0.1')
|
|
spec = k8s_client.V1PodSpec(hostname='p-0', node_name='kind-control-plane', containers=[])
|
|
items = [k8s_client.V1Pod(metadata=metadata, status=status, spec=spec)]
|
|
return k8s_client.V1PodList(items=items, kind='PodList')
|
|
|
|
|
|
def mock_namespaced_kind(*args, **kwargs):
|
|
mock = Mock()
|
|
mock.metadata.resource_version = '2'
|
|
return mock
|
|
|
|
|
|
def mock_load_k8s_config(self, *args, **kwargs):
|
|
self._server = 'http://localhost'
|
|
|
|
|
|
class TestK8sConfig(unittest.TestCase):
|
|
|
|
def test_load_incluster_config(self):
|
|
for env in ({}, {SERVICE_HOST_ENV_NAME: '', SERVICE_PORT_ENV_NAME: ''}):
|
|
with patch('os.environ', env):
|
|
self.assertRaises(k8s_config.ConfigException, k8s_config.load_incluster_config)
|
|
|
|
with patch('os.environ', {SERVICE_HOST_ENV_NAME: 'a', SERVICE_PORT_ENV_NAME: '1'}), \
|
|
patch('os.path.isfile', Mock(side_effect=[False, True, True, False, True, True, True, True])), \
|
|
patch('builtins.open', Mock(side_effect=[
|
|
mock_open()(), mock_open(read_data='a')(), mock_open(read_data='a')(),
|
|
mock_open()(), mock_open(read_data='a')(), mock_open(read_data='a')()])):
|
|
for _ in range(0, 4):
|
|
self.assertRaises(k8s_config.ConfigException, k8s_config.load_incluster_config)
|
|
k8s_config.load_incluster_config()
|
|
self.assertEqual(k8s_config.server, 'https://a:1')
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a')
|
|
|
|
def test_refresh_token(self):
|
|
with patch('os.environ', {SERVICE_HOST_ENV_NAME: 'a', SERVICE_PORT_ENV_NAME: '1'}), \
|
|
patch('os.path.isfile', Mock(side_effect=[True, True, False, True, True, True])), \
|
|
patch('builtins.open', Mock(side_effect=[
|
|
mock_open(read_data='cert')(), mock_open(read_data='a')(),
|
|
mock_open()(), mock_open(read_data='b')(), mock_open(read_data='c')()])):
|
|
k8s_config.load_incluster_config(token_refresh_interval=datetime.timedelta(milliseconds=100))
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a')
|
|
time.sleep(0.1)
|
|
# token file doesn't exist
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a')
|
|
# token file is empty
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer a')
|
|
# token refreshed
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer b')
|
|
time.sleep(0.1)
|
|
# token refreshed
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer c')
|
|
# no need to refresh token
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer c')
|
|
|
|
def test_load_kube_config(self):
|
|
config = {
|
|
"current-context": "local",
|
|
"contexts": [{"name": "local", "context": {"user": "local", "cluster": "local"}}],
|
|
"clusters": [{"name": "local", "cluster": {"server": "https://a:1/", "certificate-authority": "a"}}],
|
|
"users": [{"name": "local", "user": {"username": "a", "password": "b", "client-certificate": "c"}}]
|
|
}
|
|
with patch('builtins.open', mock_open(read_data=json.dumps(config))):
|
|
k8s_config.load_kube_config()
|
|
self.assertEqual(k8s_config.server, 'https://a:1')
|
|
self.assertEqual(k8s_config.pool_config, {'ca_certs': 'a', 'cert_file': 'c', 'cert_reqs': 'CERT_REQUIRED',
|
|
'maxsize': 10, 'num_pools': 10})
|
|
|
|
config["users"][0]["user"]["token"] = "token"
|
|
with patch('builtins.open', mock_open(read_data=json.dumps(config))):
|
|
k8s_config.load_kube_config()
|
|
self.assertEqual(k8s_config.headers.get('authorization'), 'Bearer token')
|
|
|
|
config["users"][0]["user"]["client-key-data"] = base64.b64encode(b'foobar').decode('utf-8')
|
|
config["clusters"][0]["cluster"]["certificate-authority-data"] = base64.b64encode(b'foobar').decode('utf-8')
|
|
with patch('builtins.open', mock_open(read_data=json.dumps(config))), \
|
|
patch('os.write', Mock()), patch('os.close', Mock()), \
|
|
patch('os.remove') as mock_remove, \
|
|
patch('atexit.register') as mock_atexit, \
|
|
patch('tempfile.mkstemp') as mock_mkstemp:
|
|
mock_mkstemp.side_effect = [(3, '1.tmp'), (4, '2.tmp')]
|
|
k8s_config.load_kube_config()
|
|
mock_atexit.assert_called_once()
|
|
mock_remove.side_effect = OSError
|
|
mock_atexit.call_args[0][0]() # call _cleanup_temp_files
|
|
mock_remove.assert_has_calls([mock.call('1.tmp'), mock.call('2.tmp')])
|
|
|
|
|
|
@patch('urllib3.PoolManager.request')
|
|
@patch.object(K8sConfig, '_server', '', create=True)
|
|
class TestApiClient(unittest.TestCase):
|
|
|
|
@patch.object(K8sConfig, '_server', '', create=True)
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
def setUp(self):
|
|
self.a = k8s_client.ApiClient(True)
|
|
self.mock_get_ep = MockResponse()
|
|
self.mock_get_ep.content = '{"subsets":[{"ports":[{"name":"https","protocol":"TCP","port":443}],' +\
|
|
'"addresses":[{"ip":"127.0.0.1"},{"ip":"127.0.0.2"}]}]}'
|
|
|
|
def test__do_http_request(self, mock_request):
|
|
mock_request.side_effect = [self.mock_get_ep] + [socket.timeout]
|
|
self.assertRaises(K8sException, self.a.call_api, 'GET', 'f')
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test_request(self, mock_request):
|
|
retry = Retry(deadline=10, max_delay=1, max_tries=1, retry_exceptions=KubernetesRetriableException)
|
|
mock_request.side_effect = [self.mock_get_ep] + 3 * [socket.timeout] + [k8s_client.rest.ApiException(500, '')]
|
|
self.assertRaises(k8s_client.rest.ApiException, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
|
mock_request.side_effect = [self.mock_get_ep, socket.timeout, Mock(), self.mock_get_ep]
|
|
self.assertRaises(k8s_client.rest.ApiException, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
|
retry.deadline = 0.0001
|
|
mock_request.side_effect = [socket.timeout, socket.timeout, self.mock_get_ep]
|
|
self.assertRaises(K8sConnectionFailed, retry, self.a.call_api, 'GET', 'f', _retry=retry)
|
|
|
|
def test__refresh_api_servers_cache(self, mock_request):
|
|
mock_request.side_effect = k8s_client.rest.ApiException(403, '')
|
|
self.a.refresh_api_servers_cache()
|
|
|
|
|
|
class TestCoreV1Api(unittest.TestCase):
|
|
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
@patch.object(K8sConfig, '_server', '', create=True)
|
|
def setUp(self):
|
|
self.a = k8s_client.CoreV1Api()
|
|
self.a._api_client.pool_manager.request = Mock(return_value=MockResponse())
|
|
|
|
def test_create_namespaced_service(self):
|
|
self.assertEqual(str(self.a.create_namespaced_service('default', {}, _request_timeout=2)), '{}')
|
|
|
|
def test_list_namespaced_endpoints(self):
|
|
self.a._api_client.pool_manager.request.return_value.content = '{"items": [1,2,3]}'
|
|
self.assertIsInstance(self.a.list_namespaced_endpoints('default'), K8sObject)
|
|
|
|
def test_patch_namespaced_config_map(self):
|
|
self.assertEqual(str(self.a.patch_namespaced_config_map('foo', 'default', {}, _request_timeout=(1, 2))), '{}')
|
|
|
|
def test_list_namespaced_pod(self):
|
|
self.a._api_client.pool_manager.request.return_value.status_code = 409
|
|
self.a._api_client.pool_manager.request.return_value.content = 'foo'
|
|
try:
|
|
self.a.list_namespaced_pod('default', label_selector='foo=bar')
|
|
self.assertFail()
|
|
except k8s_client.rest.ApiException as e:
|
|
self.assertTrue('Reason: ' in str(e))
|
|
|
|
def test_delete_namespaced_pod(self):
|
|
self.assertEqual(str(self.a.delete_namespaced_pod('foo', 'default', _request_timeout=(1, 2), body={})), '{}')
|
|
|
|
|
|
class BaseTestKubernetes(unittest.TestCase):
|
|
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
@patch('socket.TCP_KEEPIDLE', 4, create=True)
|
|
@patch('socket.TCP_KEEPINTVL', 5, create=True)
|
|
@patch('socket.TCP_KEEPCNT', 6, create=True)
|
|
@patch.object(Thread, 'start', Mock())
|
|
@patch.object(K8sConfig, 'load_kube_config', mock_load_k8s_config)
|
|
@patch.object(K8sConfig, 'load_incluster_config', Mock(side_effect=k8s_config.ConfigException))
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod, create=True)
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
|
|
def setUp(self, config=None):
|
|
config = {'ttl': 30, 'scope': 'test', 'name': 'p-0', 'loop_wait': 10, 'retry_timeout': 10,
|
|
'kubernetes': {'labels': {'f': 'b'}, 'bypass_api_service': True, **(config or {}),
|
|
'bootstrap_labels': {'foo': 'bar'}},
|
|
'citus': {'group': 0, 'database': 'postgres'}}
|
|
self.k = get_dcs(config)
|
|
self.assertIsInstance(self.k, Kubernetes)
|
|
self.k._mpp = get_mpp({})
|
|
self.assertRaises(AttributeError, self.k._pods._build_cache)
|
|
self.k._pods._is_ready = True
|
|
self.assertRaises(TypeError, self.k._kinds._build_cache)
|
|
self.k._kinds._is_ready = True
|
|
self.k.get_cluster()
|
|
|
|
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', mock_namespaced_kind, create=True)
|
|
class TestKubernetesConfigMaps(BaseTestKubernetes):
|
|
|
|
@patch('time.time', Mock(side_effect=[1, 10.9, 100]))
|
|
def test__wait_caches(self):
|
|
self.k._pods._is_ready = False
|
|
with self.k._condition:
|
|
self.assertRaises(RetryFailedError, self.k._wait_caches, time.time() + 10)
|
|
|
|
@patch('time.time', Mock(return_value=time.time() + 100))
|
|
def test_get_cluster(self):
|
|
self.k.get_cluster()
|
|
|
|
with patch.object(Kubernetes, '_wait_caches', Mock(side_effect=Exception)):
|
|
self.assertRaises(KubernetesError, self.k.get_cluster)
|
|
|
|
def test__get_citus_cluster(self):
|
|
self.k._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})
|
|
cluster = self.k.get_cluster()
|
|
self.assertIsInstance(cluster, Cluster)
|
|
self.assertIsInstance(cluster.workers[1], Cluster)
|
|
|
|
@patch('patroni.dcs.kubernetes.logger.error')
|
|
def test_get_mpp_coordinator(self, mock_logger):
|
|
self.assertIsInstance(self.k.get_mpp_coordinator(), Cluster)
|
|
with patch.object(Kubernetes, '_postgresql_cluster_loader', Mock(side_effect=Exception)):
|
|
self.assertIsNone(self.k.get_mpp_coordinator())
|
|
mock_logger.assert_called()
|
|
self.assertEqual(mock_logger.call_args[0][0], 'Failed to load %s coordinator cluster from Kubernetes: %r')
|
|
self.assertEqual(mock_logger.call_args[0][1], 'Null')
|
|
self.assertIsInstance(mock_logger.call_args[0][2], KubernetesError)
|
|
|
|
@patch('patroni.dcs.kubernetes.logger.error')
|
|
def test_get_citus_coordinator(self, mock_logger):
|
|
self.k._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})
|
|
self.assertIsInstance(self.k.get_mpp_coordinator(), Cluster)
|
|
with patch.object(Kubernetes, '_postgresql_cluster_loader', Mock(side_effect=Exception)):
|
|
self.assertIsNone(self.k.get_mpp_coordinator())
|
|
mock_logger.assert_called()
|
|
self.assertEqual(mock_logger.call_args[0][0], 'Failed to load %s coordinator cluster from Kubernetes: %r')
|
|
self.assertEqual(mock_logger.call_args[0][1], 'Citus')
|
|
self.assertIsInstance(mock_logger.call_args[0][2], KubernetesError)
|
|
|
|
def test_attempt_to_acquire_leader(self):
|
|
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', create=True) as mock_patch:
|
|
mock_patch.side_effect = K8sException
|
|
self.assertRaises(KubernetesError, self.k.attempt_to_acquire_leader)
|
|
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
|
|
self.assertFalse(self.k.attempt_to_acquire_leader())
|
|
|
|
def test_take_leader(self):
|
|
self.k.take_leader()
|
|
self.k._leader_observed_record['leader'] = 'test'
|
|
self.k.patch_or_create = Mock(return_value=False)
|
|
self.k.take_leader()
|
|
|
|
def test_manual_failover(self):
|
|
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map',
|
|
Mock(side_effect=RetryFailedError('')), create=True):
|
|
self.k.manual_failover('foo', 'bar')
|
|
|
|
def test_set_config_value(self):
|
|
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map',
|
|
Mock(side_effect=k8s_client.rest.ApiException(409, '')), create=True):
|
|
self.k.set_config_value('{}', 1)
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', create=True)
|
|
def test_touch_member(self, mock_patch_namespaced_pod):
|
|
mock_patch_namespaced_pod.return_value.metadata.resource_version = '10'
|
|
|
|
self.k._name = 'p-1'
|
|
self.k.touch_member({'role': 'replica', 'state': 'initializing new cluster'})
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')
|
|
|
|
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)
|
|
|
|
self.k.touch_member({'role': 'replica', 'state': 'running custom bootstrap script'})
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')
|
|
|
|
self.k.touch_member({'role': 'replica', 'state': 'starting after custom bootstrap'})
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')
|
|
|
|
self.k.touch_member({'state': 'stopped', 'role': 'primary'})
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)
|
|
|
|
self.k._role_label = 'isMaster'
|
|
self.k._leader_label_value = 'true'
|
|
self.k._follower_label_value = 'false'
|
|
self.k._standby_leader_label_value = 'false'
|
|
self.k._tmp_role_label = 'tmp_role'
|
|
|
|
self.k.touch_member({'state': 'creating replica', 'role': 'replica'})
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')
|
|
|
|
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false')
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'replica')
|
|
mock_patch_namespaced_pod.rest_mock()
|
|
|
|
self.k._name = 'p-0'
|
|
self.k.touch_member({'state': 'running', 'role': 'standby_leader'})
|
|
mock_patch_namespaced_pod.assert_called()
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false')
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary')
|
|
mock_patch_namespaced_pod.rest_mock()
|
|
|
|
self.k.touch_member({'state': 'running', 'role': 'primary'})
|
|
mock_patch_namespaced_pod.assert_called()
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'true')
|
|
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary')
|
|
|
|
def test_initialize(self):
|
|
self.k.initialize()
|
|
|
|
def test_delete_leader(self):
|
|
self.k.delete_leader(self.k.get_cluster().leader, 1)
|
|
|
|
def test_cancel_initialization(self):
|
|
self.k.cancel_initialization()
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'delete_collection_namespaced_config_map',
|
|
Mock(side_effect=k8s_client.rest.ApiException(403, '')), create=True)
|
|
def test_delete_cluster(self):
|
|
self.k.delete_cluster()
|
|
|
|
def test_watch(self):
|
|
self.k.set_ttl(10)
|
|
self.k.watch(None, 0)
|
|
self.k.watch('5', 0)
|
|
|
|
def test_set_history_value(self):
|
|
self.k.set_history_value('{}')
|
|
|
|
@patch('patroni.dcs.kubernetes.logger.warning')
|
|
def test_reload_config(self, mock_warning):
|
|
self.k.reload_config({'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10, 'retriable_http_codes': '401, 403 '})
|
|
self.assertEqual(self.k._api._retriable_http_codes, self.k._api._DEFAULT_RETRIABLE_HTTP_CODES | set([401, 403]))
|
|
self.k.reload_config({'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10, 'retriable_http_codes': 402})
|
|
self.assertEqual(self.k._api._retriable_http_codes, self.k._api._DEFAULT_RETRIABLE_HTTP_CODES | set([402]))
|
|
self.k.reload_config({'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10, 'retriable_http_codes': [405, 406]})
|
|
self.assertEqual(self.k._api._retriable_http_codes, self.k._api._DEFAULT_RETRIABLE_HTTP_CODES | set([405, 406]))
|
|
self.k.reload_config({'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10, 'retriable_http_codes': True})
|
|
mock_warning.assert_called_once()
|
|
|
|
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
class TestKubernetesEndpointsNoPodIP(BaseTestKubernetes):
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True)
|
|
def setUp(self, config=None):
|
|
super(TestKubernetesEndpointsNoPodIP, self).setUp({'use_endpoints': True})
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
|
|
def test_update_leader(self, mock_patch_namespaced_endpoints):
|
|
self.assertIsNotNone(self.k.update_leader(self.k.get_cluster(), '123', failsafe={'foo': 'bar'}))
|
|
args = mock_patch_namespaced_endpoints.call_args[0]
|
|
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '1')
|
|
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1')
|
|
|
|
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
class TestKubernetesEndpoints(BaseTestKubernetes):
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True)
|
|
def setUp(self, config=None):
|
|
super(TestKubernetesEndpoints, self).setUp({'use_endpoints': True, 'pod_ip': '10.0.0.0'})
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
|
|
def test_update_leader(self, mock_patch_namespaced_endpoints):
|
|
cluster = self.k.get_cluster()
|
|
self.assertIsNotNone(self.k.update_leader(cluster, '123', failsafe={'foo': 'bar'}))
|
|
args = mock_patch_namespaced_endpoints.call_args[0]
|
|
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '10')
|
|
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.0')
|
|
self.k._kinds._object_cache['test'].subsets[:] = []
|
|
self.assertIsNotNone(self.k.update_leader(cluster, '123'))
|
|
self.k._kinds._object_cache['test'].metadata.annotations['leader'] = 'p-1'
|
|
self.assertFalse(self.k.update_leader(cluster, '123'))
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'read_namespaced_endpoints', create=True)
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
|
|
def test__update_leader_with_retry(self, mock_patch, mock_read):
|
|
cluster = self.k.get_cluster()
|
|
mock_read.return_value = mock_read_namespaced_endpoints()
|
|
mock_patch.side_effect = k8s_client.rest.ApiException(502, '')
|
|
self.assertFalse(self.k.update_leader(cluster, '123'))
|
|
mock_patch.side_effect = RetryFailedError('')
|
|
self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123')
|
|
mock_patch.side_effect = [k8s_client.rest.ApiException(409, ''),
|
|
k8s_client.rest.ApiException(409, ''), mock_namespaced_kind()]
|
|
mock_read.return_value.metadata.resource_version = '2'
|
|
mock_time = Mock(side_effect=[0, 0, 100, 200, 0, 0, 0, 0, 0, 100, 200])
|
|
with patch('time.time', mock_time), patch('time.time_ns', mock_time, create=True):
|
|
self.assertFalse(self.k.update_leader(cluster, '123'))
|
|
self.assertFalse(self.k.update_leader(cluster, '123'))
|
|
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
|
|
self.assertFalse(self.k.update_leader(cluster, '123'))
|
|
mock_patch.side_effect = [k8s_client.rest.ApiException(409, ''), mock_namespaced_kind()]
|
|
self.assertTrue(self.k._update_leader_with_retry({}, '1', []))
|
|
mock_patch.side_effect = [k8s_client.rest.ApiException(409, ''), mock_namespaced_kind()]
|
|
self.assertIsNotNone(self.k._update_leader_with_retry({'foo': 'bar'}, '1', []))
|
|
mock_patch.side_effect = k8s_client.rest.ApiException(409, '')
|
|
mock_read.side_effect = RetryFailedError('')
|
|
self.assertRaises(KubernetesError, self.k.update_leader, cluster, '123')
|
|
mock_read.side_effect = Exception
|
|
self.assertFalse(self.k.update_leader(cluster, '123'))
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints',
|
|
Mock(side_effect=[k8s_client.rest.ApiException(500, ''),
|
|
k8s_client.rest.ApiException(502, '')]), create=True)
|
|
def test_delete_sync_state(self):
|
|
self.assertFalse(self.k.delete_sync_state(1))
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', mock_namespaced_kind, create=True)
|
|
def test_write_sync_state(self):
|
|
self.assertIsNotNone(self.k.write_sync_state('a', ['b'], 0, 1))
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', mock_namespaced_kind, create=True)
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', mock_namespaced_kind, create=True)
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_service',
|
|
Mock(side_effect=[True,
|
|
False,
|
|
k8s_client.rest.ApiException(409, ''),
|
|
k8s_client.rest.ApiException(403, ''),
|
|
k8s_client.rest.ApiException(500, ''),
|
|
Exception("Unexpected")
|
|
]), create=True)
|
|
@patch('patroni.dcs.kubernetes.logger.exception')
|
|
def test__create_config_service(self, mock_logger_exception):
|
|
self.assertIsNotNone(self.k.patch_or_create_config({'foo': 'bar'}))
|
|
self.assertIsNotNone(self.k.patch_or_create_config({'foo': 'bar'}))
|
|
|
|
self.k.patch_or_create_config({'foo': 'bar'})
|
|
mock_logger_exception.assert_not_called()
|
|
|
|
self.k.patch_or_create_config({'foo': 'bar'})
|
|
mock_logger_exception.assert_not_called()
|
|
|
|
self.k.patch_or_create_config({'foo': 'bar'})
|
|
mock_logger_exception.assert_called_once()
|
|
self.assertEqual(('create_config_service failed',), mock_logger_exception.call_args[0])
|
|
mock_logger_exception.reset_mock()
|
|
|
|
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
|
mock_logger_exception.assert_called_once()
|
|
self.assertEqual(('create_config_service failed',), mock_logger_exception.call_args[0])
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', mock_namespaced_kind, create=True)
|
|
def test_write_leader_optime(self):
|
|
self.k.write_leader_optime(12345)
|
|
|
|
|
|
def mock_watch(*args):
|
|
return urllib3.HTTPResponse()
|
|
|
|
|
|
@patch('urllib3.PoolManager.request', Mock())
|
|
class TestCacheBuilder(BaseTestKubernetes):
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
|
|
@patch('patroni.dcs.kubernetes.ObjectCache._watch', mock_watch)
|
|
@patch.object(urllib3.HTTPResponse, 'read_chunked')
|
|
def test__build_cache(self, mock_read_chunked):
|
|
self.k._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})
|
|
mock_read_chunked.return_value = [json.dumps(
|
|
{'type': 'MODIFIED', 'object': {'metadata': {
|
|
'name': self.k.config_path, 'resourceVersion': '2', 'annotations': {self.k._CONFIG: 'foo'}}}}
|
|
).encode('utf-8'), ('\n' + json.dumps(
|
|
{'type': 'DELETED', 'object': {'metadata': {
|
|
'name': self.k.config_path, 'resourceVersion': '3'}}}
|
|
) + '\n' + json.dumps(
|
|
{'type': 'MDIFIED', 'object': {'metadata': {'name': self.k.config_path}}}
|
|
) + '\n').encode('utf-8'), b'{"object":{', b'"code":410}}\n']
|
|
self.k._kinds._build_cache()
|
|
|
|
@patch('patroni.dcs.kubernetes.logger.error', Mock(side_effect=SleepException))
|
|
@patch('patroni.dcs.kubernetes.ObjectCache._build_cache', Mock(side_effect=Exception))
|
|
def test_run(self):
|
|
self.assertRaises(SleepException, self.k._pods.run)
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test__list(self):
|
|
self.k._pods._func = Mock(side_effect=Exception)
|
|
self.assertRaises(Exception, self.k._pods._list)
|
|
|
|
@patch('patroni.dcs.kubernetes.ObjectCache._watch', Mock(return_value=None))
|
|
def test__do_watch(self):
|
|
self.assertRaises(AttributeError, self.k._kinds._do_watch, '1')
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
|
|
@patch('patroni.dcs.kubernetes.ObjectCache._watch', mock_watch)
|
|
@patch.object(urllib3.HTTPResponse, 'read_chunked', Mock(return_value=[]))
|
|
def test_kill_stream(self):
|
|
self.k._kinds.kill_stream()
|
|
with patch.object(urllib3.HTTPResponse, 'connection') as mock_connection:
|
|
mock_connection.sock.close.side_effect = Exception
|
|
self.k._kinds._do_watch('1')
|
|
self.k._kinds.kill_stream()
|
|
with patch.object(urllib3.HTTPResponse, 'connection', PropertyMock(side_effect=Exception)):
|
|
self.k._kinds.kill_stream()
|