Reduce lock time taken by dcs.get_cluster() (#989)

`dcs.cluster` and `dcs.get_cluster()` are using the same lock resource and therefore when get_cluster call is slow due to the slowness of DCS it was also affecting the `dcs.cluster` call, which in return was making health-check requests slow.
This commit is contained in:
Alexander Kukushkin
2019-03-12 22:37:11 +01:00
committed by GitHub
parent 92720882aa
commit 680444ae13
9 changed files with 73 additions and 37 deletions

View File

@@ -9,6 +9,7 @@ import pkgutil
import re
import six
import sys
import time
from collections import defaultdict, namedtuple
from copy import deepcopy
@@ -529,6 +530,7 @@ class AbstractDCS(object):
self._ctl = bool(config.get('patronictl', False))
self._cluster = None
self._cluster_valid_till = 0
self._cluster_thread_lock = Lock()
self._last_leader_operation = ''
self.event = Event()
@@ -576,6 +578,10 @@ class AbstractDCS(object):
def set_ttl(self, ttl):
"""Set the new ttl value for leader key"""
@abc.abstractmethod
def ttl(self):
"""Get new ttl value"""
@abc.abstractmethod
def set_retry_timeout(self, retry_timeout):
"""Set the new value for retry_timeout"""
@@ -603,22 +609,26 @@ class AbstractDCS(object):
instance would be demoted."""
def get_cluster(self):
try:
cluster = self._load_cluster()
except Exception:
self.reset_cluster()
raise
with self._cluster_thread_lock:
try:
self._load_cluster()
except Exception:
self._cluster = None
raise
return self._cluster
self._cluster = cluster
self._cluster_valid_till = time.time() + self.ttl
return cluster
@property
def cluster(self):
with self._cluster_thread_lock:
return self._cluster
return self._cluster if self._cluster_valid_till > time.time() else None
def reset_cluster(self):
with self._cluster_thread_lock:
self._cluster = None
self._cluster_valid_till = 0
@abc.abstractmethod
def _write_leader_optime(self, last_operation):
@@ -684,7 +694,7 @@ class AbstractDCS(object):
"""Create or update `/config` key"""
@abc.abstractmethod
def touch_member(self, data, ttl=None, permanent=False):
def touch_member(self, data, permanent=False):
"""Update member key in DCS.
This method should create or update key with the name = '/members/' + `~self._name`
and value = data in a given DCS.

View File

@@ -237,6 +237,10 @@ class Consul(AbstractDCS):
self._session = None
self.__do_not_watch = True
@property
def ttl(self):
return self._client.http.ttl
def set_retry_timeout(self, retry_timeout):
self._retry.deadline = retry_timeout
self._client.http.set_read_timeout(retry_timeout)
@@ -342,15 +346,15 @@ class Consul(AbstractDCS):
sync = nodes.get(self._SYNC)
sync = SyncState.from_node(sync and sync['ModifyIndex'], sync and sync['Value'])
self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
except NotFound:
self._cluster = Cluster(None, None, None, None, [], None, None, None)
return Cluster(None, None, None, None, [], None, None, None)
except Exception:
logger.exception('get_cluster')
raise ConsulError('Consul is not responding properly')
@catch_consul_errors
def touch_member(self, data, ttl=None, permanent=False):
def touch_member(self, data, permanent=False):
cluster = self.cluster
member = cluster and cluster.get_member(self._name, fallback_to_leader=False)
create_member = not permanent and self.refresh_session()
@@ -503,6 +507,7 @@ class Consul(AbstractDCS):
return self.retry(self._client.kv.delete, self.sync_path, cas=index)
def watch(self, leader_index, timeout):
self._last_session_refresh = 0
if self.__do_not_watch:
self.__do_not_watch = False
return True
@@ -521,5 +526,4 @@ class Consul(AbstractDCS):
try:
return super(Consul, self).watch(None, timeout)
finally:
self._last_session_refresh = 0
self.event.clear()

View File

@@ -443,6 +443,10 @@ class Etcd(AbstractDCS):
self._ttl = ttl
self._client.set_machines_cache_ttl(ttl*10)
@property
def ttl(self):
return self._ttl
def set_retry_timeout(self, retry_timeout):
self._retry.deadline = retry_timeout
self._client.set_read_timeout(retry_timeout)
@@ -452,6 +456,7 @@ class Etcd(AbstractDCS):
return Member.from_node(node.modifiedIndex, os.path.basename(node.key), node.ttl, node.value)
def _load_cluster(self):
cluster = None
try:
result = self.retry(self._client.read, self.client_path(''), recursive=True)
nodes = {node.key[len(result.key):].lstrip('/'): node for node in result.leaves}
@@ -492,17 +497,18 @@ class Etcd(AbstractDCS):
sync = nodes.get(self._SYNC)
sync = SyncState.from_node(sync and sync.modifiedIndex, sync and sync.value)
self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
except etcd.EtcdKeyNotFound:
self._cluster = Cluster(None, None, None, None, [], None, None, None)
cluster = Cluster(None, None, None, None, [], None, None, None)
except Exception as e:
self._handle_exception(e, 'get_cluster', raise_ex=EtcdError('Etcd is not responding properly'))
self._has_failed = False
return cluster
@catch_etcd_errors
def touch_member(self, data, ttl=None, permanent=False):
def touch_member(self, data, permanent=False):
data = json.dumps(data, separators=(',', ':'))
return self._client.set(self.member_path, data, None if permanent else ttl or self._ttl)
return self._client.set(self.member_path, data, None if permanent else self._ttl)
@catch_etcd_errors
def take_leader(self):

View File

@@ -107,6 +107,7 @@ class Kubernetes(AbstractDCS):
self._leader_observed_time = None
self._leader_resource_version = None
self._leader_observed_subsets = []
self._config_resource_version = None
self.__do_not_watch = False
def retry(self, *args, **kwargs):
@@ -124,6 +125,10 @@ class Kubernetes(AbstractDCS):
self.__do_not_watch = self._ttl != ttl
self._ttl = ttl
@property
def ttl(self):
return self._ttl
def set_retry_timeout(self, retry_timeout):
self._retry.deadline = retry_timeout
self._api.set_timeout(retry_timeout)
@@ -146,6 +151,7 @@ class Kubernetes(AbstractDCS):
config = nodes.get(self.config_path)
metadata = config and config.metadata
self._config_resource_version = metadata.resource_version if metadata else None
annotations = metadata and metadata.annotations or {}
# get initialize flag
@@ -201,7 +207,7 @@ class Kubernetes(AbstractDCS):
metadata = sync and sync.metadata
sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations)
self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
except Exception:
logger.exception('get_cluster')
raise KubernetesError('Kubernetes API is not responding properly')
@@ -280,7 +286,10 @@ class Kubernetes(AbstractDCS):
if self.__subsets and not patch and not resource_version:
self._should_create_config_service = True
self._create_config_service()
return self.patch_or_create(self.config_path, annotations, resource_version, patch, retry)
ret = self.patch_or_create(self.config_path, annotations, resource_version, patch, retry)
if ret:
self._config_resource_version = ret.metadata.resource_version
return ret
def _create_config_service(self):
metadata = k8s_client.V1ObjectMeta(namespace=self._namespace, name=self.config_path, labels=self._labels)
@@ -349,11 +358,10 @@ class Kubernetes(AbstractDCS):
return self.patch_or_create(self.failover_path, annotations, index, bool(index or patch), False)
def set_config_value(self, value, index=None):
patch = bool(index or self.cluster and self.cluster.config and self.cluster.config.index)
return self.patch_or_create_config({self._CONFIG: value}, index, patch, False)
return self.patch_or_create_config({self._CONFIG: value}, index, bool(self._config_resource_version), False)
@catch_kubernetes_errors
def touch_member(self, data, ttl=None, permanent=False):
def touch_member(self, data, permanent=False):
cluster = self.cluster
if cluster and cluster.leader and cluster.leader.name == self._name:
role = 'promoted' if data['role'] in ('replica', 'promoted') else 'master'
@@ -386,15 +394,14 @@ class Kubernetes(AbstractDCS):
self.reset_cluster()
def cancel_initialization(self):
self.patch_or_create_config({self._INITIALIZE: None}, self.cluster.config.index, True)
self.patch_or_create_config({self._INITIALIZE: None}, self._config_resource_version, True)
@catch_kubernetes_errors
def delete_cluster(self):
self.retry(self._api.delete_collection_namespaced_kind, self._namespace, label_selector=self._label_selector)
def set_history_value(self, value):
patch = bool(self.cluster and self.cluster.config and self.cluster.config.index)
return self.patch_or_create_config({self._HISTORY: value}, None, patch, False)
return self.patch_or_create_config({self._HISTORY: value}, None, bool(self._config_resource_version), False)
def set_sync_state_value(self, value, index=None):
"""Unused"""

View File

@@ -126,6 +126,10 @@ class ZooKeeper(AbstractDCS):
self._client.restart()
return True
@property
def ttl(self):
return self._client._session_timeout
def set_retry_timeout(self, retry_timeout):
retry = self._client.retry if isinstance(self._client.retry, KazooRetry) else self._client._retry
retry.deadline = retry_timeout
@@ -206,16 +210,18 @@ class ZooKeeper(AbstractDCS):
failover = self.get_node(self.failover_path, watch=self.cluster_watcher) if self._FAILOVER in nodes else None
failover = failover and Failover.from_node(failover[1].version, failover[0])
self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history)
def _load_cluster(self):
if self._fetch_cluster or self._cluster is None:
cluster = self.cluster
if self._fetch_cluster or cluster is None:
try:
self._client.retry(self._inner_load_cluster)
cluster = self._client.retry(self._inner_load_cluster)
except Exception:
logger.exception('get_cluster')
self.cluster_watcher(None)
raise ZooKeeperError('ZooKeeper in not responding properly')
return cluster
def _create(self, path, value, retry=False, ephemeral=False):
try:
@@ -264,7 +270,7 @@ class ZooKeeper(AbstractDCS):
return self._create(self.initialize_path, sysid, retry=True) if create_new \
else self._client.retry(self._client.set, self.initialize_path, sysid)
def touch_member(self, data, ttl=None, permanent=False):
def touch_member(self, data, permanent=False):
cluster = self.cluster
member = cluster and cluster.get_member(self._name, fallback_to_leader=False)
encoded_data = json.dumps(data, separators=(',', ':')).encode('utf-8')

View File

@@ -112,11 +112,11 @@ class Ha(object):
def is_leader(self):
with self._is_leader_lock:
return self._is_leader and not self._leader_access_is_restricted
return self._is_leader > time.time() and not self._leader_access_is_restricted
def set_is_leader(self, value):
with self._is_leader_lock:
self._is_leader = value
self._is_leader = time.time() + self.dcs.ttl if value else 0
def set_leader_access_is_restricted(self, value):
with self._is_leader_lock:

View File

@@ -83,7 +83,7 @@ class TestConsul(unittest.TestCase):
self.c = Consul({'ttl': 30, 'scope': 'test', 'name': 'postgresql1', 'host': 'localhost:1', 'retry_timeout': 10,
'register_service': True})
self.c._base_path = '/service/good'
self.c._load_cluster()
self.c.get_cluster()
@patch('time.sleep', Mock(side_effect=SleepException))
@patch.object(consul.Consul.Session, 'create', Mock(side_effect=ConsulException))

View File

@@ -1,3 +1,4 @@
import time
import unittest
from mock import Mock, patch
@@ -33,13 +34,14 @@ class TestKubernetes(unittest.TestCase):
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod)
def setUp(self):
self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, 'labels': {'f': 'b'}})
with patch('time.time', Mock(return_value=1)):
self.k.get_cluster()
def test_get_cluster(self):
with patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map), \
patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod), \
patch('time.time', Mock(return_value=time.time() + 31)):
self.k.get_cluster()
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map)
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod)
def test_get_cluster(self):
self.k.get_cluster()
with patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', Mock(side_effect=Exception)):
self.assertRaises(KubernetesError, self.k.get_cluster)

View File

@@ -17,6 +17,7 @@ class MockKazooClient(Mock):
def __init__(self, *args, **kwargs):
super(MockKazooClient, self).__init__()
self._session_timeout = 30
@property
def client_id(self):
@@ -24,7 +25,7 @@ class MockKazooClient(Mock):
@staticmethod
def retry(func, *args, **kwargs):
func(*args, **kwargs)
return func(*args, **kwargs)
def get(self, path, watch=None):
if not isinstance(path, six.string_types):