diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 3c2c4429..4ff0e4e1 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -284,6 +284,7 @@ class AbstractDCS(object): self._ctl = bool(config.get('patronictl', False)) self._cluster = None self._cluster_thread_lock = Lock() + self._last_leader_operation = '' self.event = Event() def client_path(self, path): @@ -366,9 +367,14 @@ class AbstractDCS(object): self._cluster = None @abc.abstractmethod - def write_leader_optime(self, last_operation): + def _write_leader_optime(self, last_operation): """write current xlog location into `/optime/leader` key in DCS - :param last_operation: absolute xlog location in bytes""" + :param last_operation: absolute xlog location in bytes + :returns: `!True` on success.""" + + def write_leader_optime(self, last_operation): + if self._last_leader_operation != last_operation and self._write_leader_optime(last_operation): + self._last_leader_operation = last_operation @abc.abstractmethod def update_leader(self): diff --git a/patroni/dcs/consul.py b/patroni/dcs/consul.py index 84efa88a..5a33de12 100644 --- a/patroni/dcs/consul.py +++ b/patroni/dcs/consul.py @@ -221,7 +221,7 @@ class Consul(AbstractDCS): return self._client.kv.put(self.config_path, value, cas=index) @catch_consul_errors - def write_leader_optime(self, last_operation): + def _write_leader_optime(self, last_operation): return self._client.kv.put(self.leader_optime_path, last_operation) @staticmethod diff --git a/patroni/dcs/etcd.py b/patroni/dcs/etcd.py index c58a2ddb..9104fdab 100644 --- a/patroni/dcs/etcd.py +++ b/patroni/dcs/etcd.py @@ -337,7 +337,7 @@ class Etcd(AbstractDCS): return self._client.write(self.config_path, value, prevIndex=index or 0) @catch_etcd_errors - def write_leader_optime(self, last_operation): + def _write_leader_optime(self, last_operation): return self._client.set(self.leader_optime_path, last_operation) @catch_etcd_errors diff --git a/patroni/dcs/zookeeper.py b/patroni/dcs/zookeeper.py index 0db58ddd..b8d79672 100644 --- a/patroni/dcs/zookeeper.py +++ b/patroni/dcs/zookeeper.py @@ -57,7 +57,6 @@ class ZooKeeper(AbstractDCS): self._my_member_data = None self._fetch_cluster = True - self._last_leader_operation = 0 self._orig_kazoo_connect = self._client._connection._connect self._client._connection._connect = self._kazoo_connect @@ -156,6 +155,10 @@ class ZooKeeper(AbstractDCS): config = self.get_node(self.config_path, watch=self.cluster_watcher) if self._CONFIG in nodes else None config = config and ClusterConfig.from_node(config[1].version, config[0], config[1].mzxid) + # get last leader operation + last_leader_operation = self._OPTIME in nodes and self._fetch_cluster and self.get_node(self.leader_optime_path) + last_leader_operation = last_leader_operation and int(last_leader_operation[0]) or 0 + # get list of members members = self.load_members() if self._MEMBERS[:-1] in nodes else [] @@ -179,10 +182,7 @@ class ZooKeeper(AbstractDCS): failover = self.get_node(self.failover_path, watch=self.cluster_watcher) if self._FAILOVER in nodes else None failover = failover and Failover.from_node(failover[1].version, failover[0]) - # get last leader operation - optime = self.get_node(self.leader_optime_path) if self._OPTIME in nodes and self._fetch_cluster else None - self._last_leader_operation = 0 if optime is None else int(optime[0]) - self._cluster = Cluster(initialize, config, leader, self._last_leader_operation, members, failover) + self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover) def _load_cluster(self): if self._fetch_cluster or self._cluster is None: @@ -267,20 +267,20 @@ class ZooKeeper(AbstractDCS): def take_leader(self): return self.attempt_to_acquire_leader() - def write_leader_optime(self, last_operation): + def _write_leader_optime(self, last_operation): last_operation = last_operation.encode('utf-8') - if last_operation != self._last_leader_operation: + try: + self._client.set_async(self.leader_optime_path, last_operation).get(timeout=1) + return True + except NoNodeError: try: - self._client.set_async(self.leader_optime_path, last_operation).get(timeout=1) - self._last_leader_operation = last_operation - except NoNodeError: - try: - self._client.create_async(self.leader_optime_path, last_operation, makepath=True).get(timeout=1) - self._last_leader_operation = last_operation - except: - logger.exception('Failed to create %s', self.leader_optime_path) + self._client.create_async(self.leader_optime_path, last_operation, makepath=True).get(timeout=1) + return True except: - logger.exception('Failed to update %s', self.leader_optime_path) + logger.exception('Failed to create %s', self.leader_optime_path) + except: + logger.exception('Failed to update %s', self.leader_optime_path) + return False def update_leader(self): return True diff --git a/tests/test_consul.py b/tests/test_consul.py index 32e27462..c9acc551 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -103,7 +103,7 @@ class TestConsul(unittest.TestCase): @patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException)) def test_write_leader_optime(self): - self.c.write_leader_optime('') + self.c.write_leader_optime('1') def test_update_leader(self): self.c.update_leader()