mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
`dcs.cluster` and `dcs.get_cluster()` are using the same lock resource and therefore when get_cluster call is slow due to the slowness of DCS it was also affecting the `dcs.cluster` call, which in return was making health-check requests slow.
129 lines
6.4 KiB
Python
129 lines
6.4 KiB
Python
import time
|
|
import unittest
|
|
|
|
from mock import Mock, patch
|
|
from patroni.dcs.kubernetes import Kubernetes, KubernetesError, k8s_client, k8s_watch, RetryFailedError
|
|
|
|
|
|
def mock_list_namespaced_config_map(self, *args, **kwargs):
|
|
metadata = {'resource_version': '1', 'labels': {'f': 'b'}, 'name': 'test-config',
|
|
'annotations': {'initialize': '123', 'config': '{}'}}
|
|
items = [k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata))]
|
|
metadata.update({'name': 'test-leader', 'annotations': {'optime': '1234', 'leader': 'p-0', 'ttl': '30s'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-failover', 'annotations': {'leader': 'p-0'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata.update({'name': 'test-sync', 'annotations': {'leader': 'p-0'}})
|
|
items.append(k8s_client.V1ConfigMap(metadata=k8s_client.V1ObjectMeta(**metadata)))
|
|
metadata = k8s_client.V1ObjectMeta(resource_version='1')
|
|
return k8s_client.V1ConfigMapList(metadata=metadata, items=items)
|
|
|
|
|
|
def mock_list_namespaced_pod(self, *args, **kwargs):
|
|
metadata = k8s_client.V1ObjectMeta(resource_version='1', name='p-0', annotations={'status': '{}'})
|
|
items = [k8s_client.V1Pod(metadata=metadata)]
|
|
return k8s_client.V1PodList(items=items)
|
|
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', Mock())
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_config_map', Mock())
|
|
class TestKubernetes(unittest.TestCase):
|
|
|
|
@patch('kubernetes.config.load_kube_config', Mock())
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map)
|
|
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod)
|
|
def setUp(self):
|
|
self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, 'labels': {'f': 'b'}})
|
|
self.k.get_cluster()
|
|
|
|
def test_get_cluster(self):
|
|
with patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map), \
|
|
patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod), \
|
|
patch('time.time', Mock(return_value=time.time() + 31)):
|
|
self.k.get_cluster()
|
|
|
|
with patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', Mock(side_effect=Exception)):
|
|
self.assertRaises(KubernetesError, self.k.get_cluster)
|
|
|
|
@patch('kubernetes.config.load_kube_config', Mock())
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
|
|
def test_update_leader(self):
|
|
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
|
|
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
|
|
self.assertIsNotNone(k.update_leader('123'))
|
|
|
|
@patch('kubernetes.config.load_kube_config', Mock())
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
|
|
def test_update_leader_with_restricted_access(self):
|
|
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
|
|
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
|
|
self.assertIsNotNone(k.update_leader('123', True))
|
|
|
|
def test_take_leader(self):
|
|
self.k.take_leader()
|
|
self.k._leader_observed_record['leader'] = 'test'
|
|
self.k.patch_or_create = Mock(return_value=False)
|
|
self.k.take_leader()
|
|
|
|
def test_manual_failover(self):
|
|
with patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', Mock(side_effect=RetryFailedError(''))):
|
|
self.k.manual_failover('foo', 'bar')
|
|
|
|
def test_set_config_value(self):
|
|
self.k.set_config_value('{}')
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', Mock(return_value=True))
|
|
def test_touch_member(self):
|
|
self.k.touch_member({'role': 'replica'})
|
|
self.k._name = 'p-1'
|
|
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
|
self.k.touch_member({'state': 'stopped', 'role': 'master'})
|
|
|
|
def test_initialize(self):
|
|
self.k.initialize()
|
|
|
|
def test_delete_leader(self):
|
|
self.k.delete_leader()
|
|
|
|
def test_cancel_initialization(self):
|
|
self.k.cancel_initialization()
|
|
|
|
@patch.object(k8s_client.CoreV1Api, 'delete_collection_namespaced_config_map',
|
|
Mock(side_effect=k8s_client.rest.ApiException(403, '')))
|
|
def test_delete_cluster(self):
|
|
self.k.delete_cluster()
|
|
|
|
@patch('kubernetes.config.load_kube_config', Mock())
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints',
|
|
Mock(side_effect=[k8s_client.rest.ApiException(502, ''), k8s_client.rest.ApiException(500, '')]))
|
|
def test_delete_sync_state(self):
|
|
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
|
|
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
|
|
self.assertFalse(k.delete_sync_state())
|
|
|
|
def test_watch(self):
|
|
self.k.set_ttl(10)
|
|
self.k.watch(None, 0)
|
|
self.k.watch(None, 0)
|
|
with patch.object(k8s_watch.Watch, 'stream',
|
|
Mock(side_effect=[Exception, [], KeyboardInterrupt,
|
|
[{'raw_object': {'metadata': {'resourceVersion': '2'}}}]])):
|
|
self.assertFalse(self.k.watch('1', 2))
|
|
self.assertRaises(KeyboardInterrupt, self.k.watch, '1', 2)
|
|
self.assertTrue(self.k.watch('1', 2))
|
|
|
|
def test_set_history_value(self):
|
|
self.k.set_history_value('{}')
|
|
|
|
@patch('kubernetes.config.load_kube_config', Mock())
|
|
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', Mock(return_value=True))
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
|
|
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_service',
|
|
Mock(side_effect=[True, False, k8s_client.rest.ApiException(500, '')]))
|
|
def test__create_config_service(self):
|
|
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
|
|
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
|
|
self.assertIsNotNone(k.patch_or_create_config({'foo': 'bar'}))
|
|
self.assertIsNotNone(k.patch_or_create_config({'foo': 'bar'}))
|
|
k.touch_member({'state': 'running', 'role': 'replica'})
|