From 680444ae13154aca6f03556cdd1280e296b549ca Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Tue, 12 Mar 2019 22:37:11 +0100 Subject: [PATCH] Reduce lock time taken by dcs.get_cluster() (#989) `dcs.cluster` and `dcs.get_cluster()` are using the same lock resource and therefore when get_cluster call is slow due to the slowness of DCS it was also affecting the `dcs.cluster` call, which in return was making health-check requests slow. --- patroni/dcs/__init__.py | 26 ++++++++++++++++++-------- patroni/dcs/consul.py | 12 ++++++++---- patroni/dcs/etcd.py | 14 ++++++++++---- patroni/dcs/kubernetes.py | 23 +++++++++++++++-------- patroni/dcs/zookeeper.py | 14 ++++++++++---- patroni/ha.py | 4 ++-- tests/test_consul.py | 2 +- tests/test_kubernetes.py | 12 +++++++----- tests/test_zookeeper.py | 3 ++- 9 files changed, 73 insertions(+), 37 deletions(-) diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index cb350479..050a78b2 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -9,6 +9,7 @@ import pkgutil import re import six import sys +import time from collections import defaultdict, namedtuple from copy import deepcopy @@ -529,6 +530,7 @@ class AbstractDCS(object): self._ctl = bool(config.get('patronictl', False)) self._cluster = None + self._cluster_valid_till = 0 self._cluster_thread_lock = Lock() self._last_leader_operation = '' self.event = Event() @@ -576,6 +578,10 @@ class AbstractDCS(object): def set_ttl(self, ttl): """Set the new ttl value for leader key""" + @abc.abstractmethod + def ttl(self): + """Get new ttl value""" + @abc.abstractmethod def set_retry_timeout(self, retry_timeout): """Set the new value for retry_timeout""" @@ -603,22 +609,26 @@ class AbstractDCS(object): instance would be demoted.""" def get_cluster(self): + try: + cluster = self._load_cluster() + except Exception: + self.reset_cluster() + raise + with self._cluster_thread_lock: - try: - self._load_cluster() - except Exception: - self._cluster = None - raise - return self._cluster + self._cluster = cluster + self._cluster_valid_till = time.time() + self.ttl + return cluster @property def cluster(self): with self._cluster_thread_lock: - return self._cluster + return self._cluster if self._cluster_valid_till > time.time() else None def reset_cluster(self): with self._cluster_thread_lock: self._cluster = None + self._cluster_valid_till = 0 @abc.abstractmethod def _write_leader_optime(self, last_operation): @@ -684,7 +694,7 @@ class AbstractDCS(object): """Create or update `/config` key""" @abc.abstractmethod - def touch_member(self, data, ttl=None, permanent=False): + def touch_member(self, data, permanent=False): """Update member key in DCS. This method should create or update key with the name = '/members/' + `~self._name` and value = data in a given DCS. diff --git a/patroni/dcs/consul.py b/patroni/dcs/consul.py index ded2776e..5bb5a551 100644 --- a/patroni/dcs/consul.py +++ b/patroni/dcs/consul.py @@ -237,6 +237,10 @@ class Consul(AbstractDCS): self._session = None self.__do_not_watch = True + @property + def ttl(self): + return self._client.http.ttl + def set_retry_timeout(self, retry_timeout): self._retry.deadline = retry_timeout self._client.http.set_read_timeout(retry_timeout) @@ -342,15 +346,15 @@ class Consul(AbstractDCS): sync = nodes.get(self._SYNC) sync = SyncState.from_node(sync and sync['ModifyIndex'], sync and sync['Value']) - self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) + return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) except NotFound: - self._cluster = Cluster(None, None, None, None, [], None, None, None) + return Cluster(None, None, None, None, [], None, None, None) except Exception: logger.exception('get_cluster') raise ConsulError('Consul is not responding properly') @catch_consul_errors - def touch_member(self, data, ttl=None, permanent=False): + def touch_member(self, data, permanent=False): cluster = self.cluster member = cluster and cluster.get_member(self._name, fallback_to_leader=False) create_member = not permanent and self.refresh_session() @@ -503,6 +507,7 @@ class Consul(AbstractDCS): return self.retry(self._client.kv.delete, self.sync_path, cas=index) def watch(self, leader_index, timeout): + self._last_session_refresh = 0 if self.__do_not_watch: self.__do_not_watch = False return True @@ -521,5 +526,4 @@ class Consul(AbstractDCS): try: return super(Consul, self).watch(None, timeout) finally: - self._last_session_refresh = 0 self.event.clear() diff --git a/patroni/dcs/etcd.py b/patroni/dcs/etcd.py index 5fbf0f34..d0c1ad86 100644 --- a/patroni/dcs/etcd.py +++ b/patroni/dcs/etcd.py @@ -443,6 +443,10 @@ class Etcd(AbstractDCS): self._ttl = ttl self._client.set_machines_cache_ttl(ttl*10) + @property + def ttl(self): + return self._ttl + def set_retry_timeout(self, retry_timeout): self._retry.deadline = retry_timeout self._client.set_read_timeout(retry_timeout) @@ -452,6 +456,7 @@ class Etcd(AbstractDCS): return Member.from_node(node.modifiedIndex, os.path.basename(node.key), node.ttl, node.value) def _load_cluster(self): + cluster = None try: result = self.retry(self._client.read, self.client_path(''), recursive=True) nodes = {node.key[len(result.key):].lstrip('/'): node for node in result.leaves} @@ -492,17 +497,18 @@ class Etcd(AbstractDCS): sync = nodes.get(self._SYNC) sync = SyncState.from_node(sync and sync.modifiedIndex, sync and sync.value) - self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) + cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) except etcd.EtcdKeyNotFound: - self._cluster = Cluster(None, None, None, None, [], None, None, None) + cluster = Cluster(None, None, None, None, [], None, None, None) except Exception as e: self._handle_exception(e, 'get_cluster', raise_ex=EtcdError('Etcd is not responding properly')) self._has_failed = False + return cluster @catch_etcd_errors - def touch_member(self, data, ttl=None, permanent=False): + def touch_member(self, data, permanent=False): data = json.dumps(data, separators=(',', ':')) - return self._client.set(self.member_path, data, None if permanent else ttl or self._ttl) + return self._client.set(self.member_path, data, None if permanent else self._ttl) @catch_etcd_errors def take_leader(self): diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 525bd530..fb491058 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -107,6 +107,7 @@ class Kubernetes(AbstractDCS): self._leader_observed_time = None self._leader_resource_version = None self._leader_observed_subsets = [] + self._config_resource_version = None self.__do_not_watch = False def retry(self, *args, **kwargs): @@ -124,6 +125,10 @@ class Kubernetes(AbstractDCS): self.__do_not_watch = self._ttl != ttl self._ttl = ttl + @property + def ttl(self): + return self._ttl + def set_retry_timeout(self, retry_timeout): self._retry.deadline = retry_timeout self._api.set_timeout(retry_timeout) @@ -146,6 +151,7 @@ class Kubernetes(AbstractDCS): config = nodes.get(self.config_path) metadata = config and config.metadata + self._config_resource_version = metadata.resource_version if metadata else None annotations = metadata and metadata.annotations or {} # get initialize flag @@ -201,7 +207,7 @@ class Kubernetes(AbstractDCS): metadata = sync and sync.metadata sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations) - self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) + return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) except Exception: logger.exception('get_cluster') raise KubernetesError('Kubernetes API is not responding properly') @@ -280,7 +286,10 @@ class Kubernetes(AbstractDCS): if self.__subsets and not patch and not resource_version: self._should_create_config_service = True self._create_config_service() - return self.patch_or_create(self.config_path, annotations, resource_version, patch, retry) + ret = self.patch_or_create(self.config_path, annotations, resource_version, patch, retry) + if ret: + self._config_resource_version = ret.metadata.resource_version + return ret def _create_config_service(self): metadata = k8s_client.V1ObjectMeta(namespace=self._namespace, name=self.config_path, labels=self._labels) @@ -349,11 +358,10 @@ class Kubernetes(AbstractDCS): return self.patch_or_create(self.failover_path, annotations, index, bool(index or patch), False) def set_config_value(self, value, index=None): - patch = bool(index or self.cluster and self.cluster.config and self.cluster.config.index) - return self.patch_or_create_config({self._CONFIG: value}, index, patch, False) + return self.patch_or_create_config({self._CONFIG: value}, index, bool(self._config_resource_version), False) @catch_kubernetes_errors - def touch_member(self, data, ttl=None, permanent=False): + def touch_member(self, data, permanent=False): cluster = self.cluster if cluster and cluster.leader and cluster.leader.name == self._name: role = 'promoted' if data['role'] in ('replica', 'promoted') else 'master' @@ -386,15 +394,14 @@ class Kubernetes(AbstractDCS): self.reset_cluster() def cancel_initialization(self): - self.patch_or_create_config({self._INITIALIZE: None}, self.cluster.config.index, True) + self.patch_or_create_config({self._INITIALIZE: None}, self._config_resource_version, True) @catch_kubernetes_errors def delete_cluster(self): self.retry(self._api.delete_collection_namespaced_kind, self._namespace, label_selector=self._label_selector) def set_history_value(self, value): - patch = bool(self.cluster and self.cluster.config and self.cluster.config.index) - return self.patch_or_create_config({self._HISTORY: value}, None, patch, False) + return self.patch_or_create_config({self._HISTORY: value}, None, bool(self._config_resource_version), False) def set_sync_state_value(self, value, index=None): """Unused""" diff --git a/patroni/dcs/zookeeper.py b/patroni/dcs/zookeeper.py index fd80124e..d71f70a5 100644 --- a/patroni/dcs/zookeeper.py +++ b/patroni/dcs/zookeeper.py @@ -126,6 +126,10 @@ class ZooKeeper(AbstractDCS): self._client.restart() return True + @property + def ttl(self): + return self._client._session_timeout + def set_retry_timeout(self, retry_timeout): retry = self._client.retry if isinstance(self._client.retry, KazooRetry) else self._client._retry retry.deadline = retry_timeout @@ -206,16 +210,18 @@ class ZooKeeper(AbstractDCS): failover = self.get_node(self.failover_path, watch=self.cluster_watcher) if self._FAILOVER in nodes else None failover = failover and Failover.from_node(failover[1].version, failover[0]) - self._cluster = Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) + return Cluster(initialize, config, leader, last_leader_operation, members, failover, sync, history) def _load_cluster(self): - if self._fetch_cluster or self._cluster is None: + cluster = self.cluster + if self._fetch_cluster or cluster is None: try: - self._client.retry(self._inner_load_cluster) + cluster = self._client.retry(self._inner_load_cluster) except Exception: logger.exception('get_cluster') self.cluster_watcher(None) raise ZooKeeperError('ZooKeeper in not responding properly') + return cluster def _create(self, path, value, retry=False, ephemeral=False): try: @@ -264,7 +270,7 @@ class ZooKeeper(AbstractDCS): return self._create(self.initialize_path, sysid, retry=True) if create_new \ else self._client.retry(self._client.set, self.initialize_path, sysid) - def touch_member(self, data, ttl=None, permanent=False): + def touch_member(self, data, permanent=False): cluster = self.cluster member = cluster and cluster.get_member(self._name, fallback_to_leader=False) encoded_data = json.dumps(data, separators=(',', ':')).encode('utf-8') diff --git a/patroni/ha.py b/patroni/ha.py index c504a362..dbeb7619 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -112,11 +112,11 @@ class Ha(object): def is_leader(self): with self._is_leader_lock: - return self._is_leader and not self._leader_access_is_restricted + return self._is_leader > time.time() and not self._leader_access_is_restricted def set_is_leader(self, value): with self._is_leader_lock: - self._is_leader = value + self._is_leader = time.time() + self.dcs.ttl if value else 0 def set_leader_access_is_restricted(self, value): with self._is_leader_lock: diff --git a/tests/test_consul.py b/tests/test_consul.py index a9d1cc93..15d1c707 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -83,7 +83,7 @@ class TestConsul(unittest.TestCase): self.c = Consul({'ttl': 30, 'scope': 'test', 'name': 'postgresql1', 'host': 'localhost:1', 'retry_timeout': 10, 'register_service': True}) self.c._base_path = '/service/good' - self.c._load_cluster() + self.c.get_cluster() @patch('time.sleep', Mock(side_effect=SleepException)) @patch.object(consul.Consul.Session, 'create', Mock(side_effect=ConsulException)) diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 26c67758..93b74807 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -1,3 +1,4 @@ +import time import unittest from mock import Mock, patch @@ -33,13 +34,14 @@ class TestKubernetes(unittest.TestCase): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod) def setUp(self): self.k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10, 'labels': {'f': 'b'}}) - with patch('time.time', Mock(return_value=1)): + self.k.get_cluster() + + def test_get_cluster(self): + with patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map), \ + patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod), \ + patch('time.time', Mock(return_value=time.time() + 31)): self.k.get_cluster() - @patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map) - @patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', mock_list_namespaced_pod) - def test_get_cluster(self): - self.k.get_cluster() with patch.object(k8s_client.CoreV1Api, 'list_namespaced_pod', Mock(side_effect=Exception)): self.assertRaises(KubernetesError, self.k.get_cluster) diff --git a/tests/test_zookeeper.py b/tests/test_zookeeper.py index cc9f396d..931aabfb 100644 --- a/tests/test_zookeeper.py +++ b/tests/test_zookeeper.py @@ -17,6 +17,7 @@ class MockKazooClient(Mock): def __init__(self, *args, **kwargs): super(MockKazooClient, self).__init__() + self._session_timeout = 30 @property def client_id(self): @@ -24,7 +25,7 @@ class MockKazooClient(Mock): @staticmethod def retry(func, *args, **kwargs): - func(*args, **kwargs) + return func(*args, **kwargs) def get(self, path, watch=None): if not isinstance(path, six.string_types):