Don't write leader optime into DCS if it didn't changed (#319)

This commit is contained in:
Alexander Kukushkin
2016-09-21 14:22:22 +02:00
committed by GitHub
parent 10c7fa41f3
commit 5265e71fc2
5 changed files with 27 additions and 21 deletions

View File

@@ -284,6 +284,7 @@ class AbstractDCS(object):
self._ctl = bool(config.get('patronictl', False))
self._cluster = None
self._cluster_thread_lock = Lock()
self._last_leader_operation = ''
self.event = Event()
def client_path(self, path):
@@ -366,9 +367,14 @@ class AbstractDCS(object):
self._cluster = None
@abc.abstractmethod
def write_leader_optime(self, last_operation):
def _write_leader_optime(self, last_operation):
"""write current xlog location into `/optime/leader` key in DCS
:param last_operation: absolute xlog location in bytes"""
:param last_operation: absolute xlog location in bytes
:returns: `!True` on success."""
def write_leader_optime(self, last_operation):
if self._last_leader_operation != last_operation and self._write_leader_optime(last_operation):
self._last_leader_operation = last_operation
@abc.abstractmethod
def update_leader(self):

View File

@@ -221,7 +221,7 @@ class Consul(AbstractDCS):
return self._client.kv.put(self.config_path, value, cas=index)
@catch_consul_errors
def write_leader_optime(self, last_operation):
def _write_leader_optime(self, last_operation):
return self._client.kv.put(self.leader_optime_path, last_operation)
@staticmethod

View File

@@ -337,7 +337,7 @@ class Etcd(AbstractDCS):
return self._client.write(self.config_path, value, prevIndex=index or 0)
@catch_etcd_errors
def write_leader_optime(self, last_operation):
def _write_leader_optime(self, last_operation):
return self._client.set(self.leader_optime_path, last_operation)
@catch_etcd_errors

View File

@@ -57,7 +57,6 @@ class ZooKeeper(AbstractDCS):
self._my_member_data = None
self._fetch_cluster = True
self._last_leader_operation = 0
self._orig_kazoo_connect = self._client._connection._connect
self._client._connection._connect = self._kazoo_connect
@@ -156,6 +155,10 @@ class ZooKeeper(AbstractDCS):
config = self.get_node(self.config_path, watch=self.cluster_watcher) if self._CONFIG in nodes else None
config = config and ClusterConfig.from_node(config[1].version, config[0], config[1].mzxid)
# get last leader operation
last_leader_operation = self._OPTIME in nodes and self._fetch_cluster and self.get_node(self.leader_optime_path)
last_leader_operation = last_leader_operation and int(last_leader_operation[0]) or 0
# get list of members
members = self.load_members() if self._MEMBERS[:-1] in nodes else []
@@ -179,10 +182,7 @@ class ZooKeeper(AbstractDCS):
failover = self.get_node(self.failover_path, watch=self.cluster_watcher) if self._FAILOVER in nodes else None
failover = failover and Failover.from_node(failover[1].version, failover[0])
# get last leader operation
optime = self.get_node(self.leader_optime_path) if self._OPTIME in nodes and self._fetch_cluster else None
self._last_leader_operation = 0 if optime is None else int(optime[0])
self._cluster = Cluster(initialize, config, leader, self._last_leader_operation, members, failover)
self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover)
def _load_cluster(self):
if self._fetch_cluster or self._cluster is None:
@@ -267,20 +267,20 @@ class ZooKeeper(AbstractDCS):
def take_leader(self):
return self.attempt_to_acquire_leader()
def write_leader_optime(self, last_operation):
def _write_leader_optime(self, last_operation):
last_operation = last_operation.encode('utf-8')
if last_operation != self._last_leader_operation:
try:
self._client.set_async(self.leader_optime_path, last_operation).get(timeout=1)
return True
except NoNodeError:
try:
self._client.set_async(self.leader_optime_path, last_operation).get(timeout=1)
self._last_leader_operation = last_operation
except NoNodeError:
try:
self._client.create_async(self.leader_optime_path, last_operation, makepath=True).get(timeout=1)
self._last_leader_operation = last_operation
except:
logger.exception('Failed to create %s', self.leader_optime_path)
self._client.create_async(self.leader_optime_path, last_operation, makepath=True).get(timeout=1)
return True
except:
logger.exception('Failed to update %s', self.leader_optime_path)
logger.exception('Failed to create %s', self.leader_optime_path)
except:
logger.exception('Failed to update %s', self.leader_optime_path)
return False
def update_leader(self):
return True

View File

@@ -103,7 +103,7 @@ class TestConsul(unittest.TestCase):
@patch.object(consul.Consul.KV, 'put', Mock(side_effect=ConsulException))
def test_write_leader_optime(self):
self.c.write_leader_optime('')
self.c.write_leader_optime('1')
def test_update_leader(self):
self.c.update_leader()