mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Take IP from the pod if kubernetes.pod_ip is missing (#2895)
It used to work before #2652 Besides that fix a couple of more problems: - make sure `_patch_or_create()` method isn't instantiating the `k8s_client.V1ConfigMap` object instead of `k8s_client.V1Endpoints` for non leader endpoints. The only reason it worked is that the JSON serialization for both object types is the same and doesn't include the object type name. - `attempt_to_acquire_leader()` should immediately put the IP address of the primary to the leader endpoint. It didn't happen because of the oversight in the https://github.com/zalando/patroni/pull/1820.
This commit is contained in:
committed by
GitHub
parent
28a604983b
commit
e19a8730ea
@@ -771,8 +771,7 @@ class Kubernetes(AbstractDCS):
|
||||
except k8s_config.ConfigException:
|
||||
k8s_config.load_kube_config(context=config.get('context', 'kind-kind'))
|
||||
|
||||
pod_ip = config.get('pod_ip')
|
||||
self.__ips: List[str] = [] if self._ctl or not isinstance(pod_ip, str) else [pod_ip]
|
||||
self.__ips: List[str] = [] if self._ctl else [config.get('pod_ip', '')]
|
||||
self.__ports: List[K8sObject] = []
|
||||
ports: List[Dict[str, Any]] = config.get('ports', [{}])
|
||||
for p in ports:
|
||||
@@ -1059,6 +1058,27 @@ class Kubernetes(AbstractDCS):
|
||||
def _patch_or_create(self, name: str, annotations: Dict[str, Any],
|
||||
resource_version: Optional[str] = None, patch: bool = False,
|
||||
retry: Optional[Callable[..., Any]] = None, ips: Optional[List[str]] = None) -> K8sObject:
|
||||
"""Patch or create K8s object, Endpoint or ConfigMap.
|
||||
|
||||
:param name: the name of the object.
|
||||
:param annotations: mapping of annotations that we want to create/update.
|
||||
:param resource_version: object should be updated only if the ``resource_version`` matches provided value.
|
||||
:param patch: ``True`` if we know in advance that the object already exists and we should patch it.
|
||||
:param retry: a callable that will take care of retries
|
||||
:param ips: IP address that we want to put to the subsets of the endpoint. Could have following values:
|
||||
|
||||
* ``None`` - when we don't need to touch subset;
|
||||
* ``[]`` - to set subsets to the empty list, when :meth:`delete_leader` method is called;
|
||||
|
||||
* ``['ip.add.re.ss']`` - when we want to make sure that the subsets of the leader endpoint
|
||||
contains the IP address of the leader, that we get from the ``kubernetes.pod_ip``;
|
||||
|
||||
* ``['']`` - when we want to make sure that the subsets of the leader endpoint contains the IP
|
||||
address of the leader, but ``kubernetes.pod_ip`` configuration is missing. In this case we will
|
||||
try to take the IP address of the Pod which name matches ``name`` from the config file.
|
||||
|
||||
:returns: the new :class:`V1Endpoints` or :class:`V1ConfigMap` object, that was created or updated.
|
||||
"""
|
||||
metadata = {'namespace': self._namespace, 'name': name, 'labels': self._labels, 'annotations': annotations}
|
||||
if patch or resource_version:
|
||||
if resource_version is not None:
|
||||
@@ -1071,9 +1091,10 @@ class Kubernetes(AbstractDCS):
|
||||
metadata['annotations'] = {k: v for k, v in annotations.items() if v is not None}
|
||||
|
||||
metadata = k8s_client.V1ObjectMeta(**metadata)
|
||||
if ips is not None and self._api.use_endpoints:
|
||||
if self._api.use_endpoints:
|
||||
endpoints = {'metadata': metadata}
|
||||
self._map_subsets(endpoints, ips)
|
||||
if ips is not None:
|
||||
self._map_subsets(endpoints, ips)
|
||||
body = k8s_client.V1Endpoints(**endpoints)
|
||||
else:
|
||||
body = k8s_client.V1ConfigMap(metadata=metadata)
|
||||
@@ -1222,11 +1243,10 @@ class Kubernetes(AbstractDCS):
|
||||
else:
|
||||
annotations['acquireTime'] = self._leader_observed_record.get('acquireTime') or now
|
||||
annotations['transitions'] = str(transitions)
|
||||
ips: Optional[List[str]] = [] if self._api.use_endpoints else None
|
||||
|
||||
try:
|
||||
ret = bool(self._patch_or_create(self.leader_path, annotations,
|
||||
self._leader_resource_version, retry=self.retry, ips=ips))
|
||||
self._leader_resource_version, retry=self.retry, ips=self.__ips))
|
||||
except k8s_client.rest.ApiException as e:
|
||||
if e.status == 409 and self._leader_resource_version: # Conflict in resource_version
|
||||
# Terminate watchers, it could be a sign that K8s API is in a failed state
|
||||
|
||||
@@ -63,7 +63,7 @@ def mock_list_namespaced_pod(*args, **kwargs):
|
||||
metadata = k8s_client.V1ObjectMeta(resource_version='1', labels={'f': 'b', Kubernetes._CITUS_LABEL: '1'},
|
||||
name='p-0', annotations={'status': '{}'},
|
||||
uid='964dfeae-e79b-4476-8a5a-1920b5c2a69d')
|
||||
status = k8s_client.V1PodStatus(pod_ip='10.0.0.0')
|
||||
status = k8s_client.V1PodStatus(pod_ip='10.0.0.1')
|
||||
spec = k8s_client.V1PodSpec(hostname='p-0', node_name='kind-control-plane', containers=[])
|
||||
items = [k8s_client.V1Pod(metadata=metadata, status=status, spec=spec)]
|
||||
return k8s_client.V1PodList(items=items, kind='PodList')
|
||||
@@ -356,6 +356,20 @@ class TestKubernetesConfigMaps(BaseTestKubernetes):
|
||||
mock_warning.assert_called_once()
|
||||
|
||||
|
||||
class TestKubernetesEndpointsNoPodIP(BaseTestKubernetes):
|
||||
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True)
|
||||
def setUp(self, config=None):
|
||||
super(TestKubernetesEndpointsNoPodIP, self).setUp({'use_endpoints': True})
|
||||
|
||||
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True)
|
||||
def test_update_leader(self, mock_patch_namespaced_endpoints):
|
||||
leader = self.k.get_cluster().leader
|
||||
self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'}))
|
||||
args = mock_patch_namespaced_endpoints.call_args[0]
|
||||
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '1')
|
||||
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1')
|
||||
|
||||
|
||||
class TestKubernetesEndpoints(BaseTestKubernetes):
|
||||
|
||||
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True)
|
||||
@@ -368,6 +382,7 @@ class TestKubernetesEndpoints(BaseTestKubernetes):
|
||||
self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'}))
|
||||
args = mock_patch_namespaced_endpoints.call_args[0]
|
||||
self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '10')
|
||||
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.0')
|
||||
self.k._kinds._object_cache['test'].subsets[:] = []
|
||||
self.assertIsNotNone(self.k.update_leader(leader, '123'))
|
||||
self.k._kinds._object_cache['test'].metadata.annotations['leader'] = 'p-1'
|
||||
|
||||
Reference in New Issue
Block a user