diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index d4158368..aee87bd3 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -771,8 +771,7 @@ class Kubernetes(AbstractDCS): except k8s_config.ConfigException: k8s_config.load_kube_config(context=config.get('context', 'kind-kind')) - pod_ip = config.get('pod_ip') - self.__ips: List[str] = [] if self._ctl or not isinstance(pod_ip, str) else [pod_ip] + self.__ips: List[str] = [] if self._ctl else [config.get('pod_ip', '')] self.__ports: List[K8sObject] = [] ports: List[Dict[str, Any]] = config.get('ports', [{}]) for p in ports: @@ -1059,6 +1058,27 @@ class Kubernetes(AbstractDCS): def _patch_or_create(self, name: str, annotations: Dict[str, Any], resource_version: Optional[str] = None, patch: bool = False, retry: Optional[Callable[..., Any]] = None, ips: Optional[List[str]] = None) -> K8sObject: + """Patch or create K8s object, Endpoint or ConfigMap. + + :param name: the name of the object. + :param annotations: mapping of annotations that we want to create/update. + :param resource_version: object should be updated only if the ``resource_version`` matches provided value. + :param patch: ``True`` if we know in advance that the object already exists and we should patch it. + :param retry: a callable that will take care of retries + :param ips: IP address that we want to put to the subsets of the endpoint. Could have following values: + + * ``None`` - when we don't need to touch subset; + * ``[]`` - to set subsets to the empty list, when :meth:`delete_leader` method is called; + + * ``['ip.add.re.ss']`` - when we want to make sure that the subsets of the leader endpoint + contains the IP address of the leader, that we get from the ``kubernetes.pod_ip``; + + * ``['']`` - when we want to make sure that the subsets of the leader endpoint contains the IP + address of the leader, but ``kubernetes.pod_ip`` configuration is missing. In this case we will + try to take the IP address of the Pod which name matches ``name`` from the config file. + + :returns: the new :class:`V1Endpoints` or :class:`V1ConfigMap` object, that was created or updated. + """ metadata = {'namespace': self._namespace, 'name': name, 'labels': self._labels, 'annotations': annotations} if patch or resource_version: if resource_version is not None: @@ -1071,9 +1091,10 @@ class Kubernetes(AbstractDCS): metadata['annotations'] = {k: v for k, v in annotations.items() if v is not None} metadata = k8s_client.V1ObjectMeta(**metadata) - if ips is not None and self._api.use_endpoints: + if self._api.use_endpoints: endpoints = {'metadata': metadata} - self._map_subsets(endpoints, ips) + if ips is not None: + self._map_subsets(endpoints, ips) body = k8s_client.V1Endpoints(**endpoints) else: body = k8s_client.V1ConfigMap(metadata=metadata) @@ -1222,11 +1243,10 @@ class Kubernetes(AbstractDCS): else: annotations['acquireTime'] = self._leader_observed_record.get('acquireTime') or now annotations['transitions'] = str(transitions) - ips: Optional[List[str]] = [] if self._api.use_endpoints else None try: ret = bool(self._patch_or_create(self.leader_path, annotations, - self._leader_resource_version, retry=self.retry, ips=ips)) + self._leader_resource_version, retry=self.retry, ips=self.__ips)) except k8s_client.rest.ApiException as e: if e.status == 409 and self._leader_resource_version: # Conflict in resource_version # Terminate watchers, it could be a sign that K8s API is in a failed state diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 4f9f418c..b6db7fb3 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -63,7 +63,7 @@ def mock_list_namespaced_pod(*args, **kwargs): metadata = k8s_client.V1ObjectMeta(resource_version='1', labels={'f': 'b', Kubernetes._CITUS_LABEL: '1'}, name='p-0', annotations={'status': '{}'}, uid='964dfeae-e79b-4476-8a5a-1920b5c2a69d') - status = k8s_client.V1PodStatus(pod_ip='10.0.0.0') + status = k8s_client.V1PodStatus(pod_ip='10.0.0.1') spec = k8s_client.V1PodSpec(hostname='p-0', node_name='kind-control-plane', containers=[]) items = [k8s_client.V1Pod(metadata=metadata, status=status, spec=spec)] return k8s_client.V1PodList(items=items, kind='PodList') @@ -356,6 +356,20 @@ class TestKubernetesConfigMaps(BaseTestKubernetes): mock_warning.assert_called_once() +class TestKubernetesEndpointsNoPodIP(BaseTestKubernetes): + @patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True) + def setUp(self, config=None): + super(TestKubernetesEndpointsNoPodIP, self).setUp({'use_endpoints': True}) + + @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', create=True) + def test_update_leader(self, mock_patch_namespaced_endpoints): + leader = self.k.get_cluster().leader + self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'})) + args = mock_patch_namespaced_endpoints.call_args[0] + self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '1') + self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1') + + class TestKubernetesEndpoints(BaseTestKubernetes): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True) @@ -368,6 +382,7 @@ class TestKubernetesEndpoints(BaseTestKubernetes): self.assertIsNotNone(self.k.update_leader(leader, '123', failsafe={'foo': 'bar'})) args = mock_patch_namespaced_endpoints.call_args[0] self.assertEqual(args[2].subsets[0].addresses[0].target_ref.resource_version, '10') + self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.0') self.k._kinds._object_cache['test'].subsets[:] = [] self.assertIsNotNone(self.k.update_leader(leader, '123')) self.k._kinds._object_cache['test'].metadata.annotations['leader'] = 'p-1'