Fix Etcd v2 with Citus (#2943)

When deploying a new Citus cluster with Etcd v2 Patroni was failing to start with the following exception:
```python
2023-11-09 10:51:41,246 INFO: Selected new etcd server http://localhost:2379
Traceback (most recent call last):
  File "/home/akukushkin/git/patroni/./patroni.py", line 6, in <module>
    main()
  File "/home/akukushkin/git/patroni/patroni/__main__.py", line 343, in main
    return patroni_main(args.configfile)
  File "/home/akukushkin/git/patroni/patroni/__main__.py", line 237, in patroni_main
    abstract_main(Patroni, configfile)
  File "/home/akukushkin/git/patroni/patroni/daemon.py", line 172, in abstract_main
    controller = cls(config)
  File "/home/akukushkin/git/patroni/patroni/__main__.py", line 66, in __init__
    self.ensure_unique_name()
  File "/home/akukushkin/git/patroni/patroni/__main__.py", line 112, in ensure_unique_name
    cluster = self.dcs.get_cluster()
  File "/home/akukushkin/git/patroni/patroni/dcs/__init__.py", line 1654, in get_cluster
    cluster = self._get_citus_cluster() if self.is_citus_coordinator() else self.__get_patroni_cluster()
  File "/home/akukushkin/git/patroni/patroni/dcs/__init__.py", line 1638, in _get_citus_cluster
    cluster = groups.pop(CITUS_COORDINATOR_GROUP_ID, Cluster.empty())
AttributeError: 'Cluster' object has no attribute 'pop'
```

It is broken since #2909.

In addition to that fix `_citus_cluster_loader()` interface by allowing it to return only dict obj.
This commit is contained in:
Alexander Kukushkin
2023-11-09 11:09:38 +01:00
parent df0fd91614
commit f2a129f209
4 changed files with 13 additions and 8 deletions

View File

@@ -1564,7 +1564,7 @@ class AbstractDCS(abc.ABC):
"""
@abc.abstractmethod
def _citus_cluster_loader(self, path: Any) -> Union[Cluster, Dict[int, Cluster]]:
def _citus_cluster_loader(self, path: Any) -> Dict[int, Cluster]:
"""Load and build all Patroni clusters from a single Citus cluster.
:param path: the path in DCS where to load Cluster(s) from.

View File

@@ -422,7 +422,7 @@ class Consul(AbstractDCS):
def _cluster_loader(self, path: str) -> Cluster:
_, results = self.retry(self._client.kv.get, path, recurse=True, consistency=self._consistency)
if results is None:
raise NotFound
return Cluster.empty()
nodes = {}
for node in results:
node['Value'] = (node['Value'] or b'').decode('utf-8')
@@ -445,8 +445,6 @@ class Consul(AbstractDCS):
) -> Union[Cluster, Dict[int, Cluster]]:
try:
return loader(path)
except NotFound:
return Cluster.empty()
except Exception:
logger.exception('get_cluster')
raise ConsulError('Consul is not responding properly')

View File

@@ -710,13 +710,20 @@ class Etcd(AbstractEtcd):
return Cluster(initialize, config, leader, status, members, failover, sync, history, failsafe)
def _cluster_loader(self, path: str) -> Cluster:
result = self.retry(self._client.read, path, recursive=True, quorum=self._ctl)
try:
result = self.retry(self._client.read, path, recursive=True, quorum=self._ctl)
except etcd.EtcdKeyNotFound:
return Cluster.empty()
nodes = {node.key[len(result.key):].lstrip('/'): node for node in result.leaves}
return self._cluster_from_nodes(result.etcd_index, nodes)
def _citus_cluster_loader(self, path: str) -> Dict[int, Cluster]:
try:
result = self.retry(self._client.read, path, recursive=True, quorum=self._ctl)
except etcd.EtcdKeyNotFound:
return {}
clusters: Dict[int, Dict[str, etcd.EtcdResult]] = defaultdict(dict)
result = self.retry(self._client.read, path, recursive=True, quorum=self._ctl)
for node in result.leaves:
key = node.key[len(result.key):].lstrip('/').split('/', 1)
if len(key) == 2 and citus_group_re.match(key[0]):
@@ -729,8 +736,6 @@ class Etcd(AbstractEtcd):
cluster = None
try:
cluster = loader(path)
except etcd.EtcdKeyNotFound:
cluster = Cluster.empty()
except Exception as e:
self._handle_exception(e, 'get_cluster', raise_ex=EtcdError('Etcd is not responding properly'))
self._has_failed = False

View File

@@ -274,6 +274,8 @@ class TestEtcd(unittest.TestCase):
cluster = self.etcd.get_cluster()
self.assertIsInstance(cluster, Cluster)
self.assertIsInstance(cluster.workers[1], Cluster)
self.etcd._base_path = '/service/nocluster'
self.assertTrue(self.etcd.get_cluster().is_empty())
def test_touch_member(self):
self.assertFalse(self.etcd.touch_member(''))