diff --git a/docs/dynamic_configuration.rst b/docs/dynamic_configuration.rst index 17ef8496..c3c94a88 100644 --- a/docs/dynamic_configuration.rst +++ b/docs/dynamic_configuration.rst @@ -76,6 +76,7 @@ Also, the following Patroni configuration options can be changed only dynamicall - loop_wait: 10 - retry_timeouts: 10 - maximum_lag_on_failover: 1048576 +- check_timeline: false - postgresql.use_slots: true Upon changing these options, Patroni will read the relevant section of the configuration stored in DCS and change its diff --git a/docs/replication_modes.rst b/docs/replication_modes.rst index 8bf36d4b..16097c19 100644 --- a/docs/replication_modes.rst +++ b/docs/replication_modes.rst @@ -13,6 +13,8 @@ In asynchronous mode the cluster is allowed to lose some committed transactions The amount of transactions that can be lost is controlled via ``maximum_lag_on_failover`` parameter. Because the primary transaction log position is not sampled in real time, in reality the amount of lost data on failover is worst case bounded by ``maximum_lag_on_failover`` bytes of transaction log plus the amount that is written in the last ``ttl`` seconds (``loop_wait``/2 seconds in the average case). However typical steady state replication delay is well under a second. +By default, when running leader elections, Patroni does not take into account the current timeline of replicas, what in some cases could be undesirable behavior. You can prevent the node not having the same timeline as a former master become the new leader by changing the value of ``check_timeline`` parameter to ``true``. + PostgreSQL synchronous replication ---------------------------------- diff --git a/patroni/config.py b/patroni/config.py index 82fb9b6c..8bf4dff7 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -43,6 +43,7 @@ class Config(object): __DEFAULT_CONFIG = { 'ttl': 30, 'loop_wait': 10, 'retry_timeout': 10, 'maximum_lag_on_failover': 1048576, + 'check_timeline': False, 'master_start_timeout': 300, 'synchronous_mode': False, 'synchronous_mode_strict': False, @@ -355,7 +356,7 @@ class Config(object): 'scope', 'retry_timeout', 'synchronous_mode', - 'maximum_lag_on_failover' + 'synchronous_mode_strict', ) pg_config.update({p: config[p] for p in updated_fields if p in config}) diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 777f83a7..cb350479 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -368,7 +368,7 @@ class SyncState(namedtuple('SyncState', 'index,leader,sync_standby')): return name is not None and name in (self.leader, self.sync_standby) -class TimelineHistory(namedtuple('TimelineHistory', 'index,lines')): +class TimelineHistory(namedtuple('TimelineHistory', 'index,value,lines')): """Object representing timeline history file""" @staticmethod @@ -384,7 +384,7 @@ class TimelineHistory(namedtuple('TimelineHistory', 'index,lines')): lines = None if not isinstance(lines, list): lines = [] - return TimelineHistory(index, lines) + return TimelineHistory(index, value, lines) class Cluster(namedtuple('Cluster', 'initialize,config,leader,last_leader_operation,members,failover,sync,history')): @@ -484,6 +484,26 @@ class Cluster(namedtuple('Cluster', 'initialize,config,leader,last_leader_operat slots = self.get_replication_slots(name, 'master').values() return any(v for v in slots if v.get("type") == "logical") + @property + def timeline(self): + """ + >>> Cluster(0, 0, 0, 0, 0, 0, 0, 0).timeline + 0 + >>> Cluster(0, 0, 0, 0, 0, 0, 0, TimelineHistory.from_node(1, '[]')).timeline + 1 + >>> Cluster(0, 0, 0, 0, 0, 0, 0, TimelineHistory.from_node(1, '[["a"]]')).timeline + 0 + """ + if self.history: + if self.history.lines: + try: + return int(self.history.lines[-1][0]) + 1 + except Exception: + logger.error('Failed to parse cluster history from DCS: %s', self.history.lines) + elif self.history.value == '[]': + return 1 + return 0 + @six.add_metaclass(abc.ABCMeta) class AbstractDCS(object): diff --git a/patroni/ha.py b/patroni/ha.py index db065576..b7ac3494 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -20,25 +20,28 @@ from threading import RLock logger = logging.getLogger(__name__) -class _MemberStatus(namedtuple('_MemberStatus', 'member,reachable,in_recovery,wal_position,tags,watchdog_failed')): +class _MemberStatus(namedtuple('_MemberStatus', ['member', 'reachable', 'in_recovery', 'timeline', + 'wal_position', 'tags', 'watchdog_failed'])): """Node status distilled from API response: member - dcs.Member object of the node reachable - `!False` if the node is not reachable or is not responding with correct JSON in_recovery - `!True` if pg_is_in_recovery() == true - wal_position - value of `replayed_location` or `location` from JSON, dependin on its role. + timeline - timeline value from JSON + wal_position - maximum value of `replayed_location` or `received_location` from JSON tags - dictionary with values of different tags (i.e. nofailover) watchdog_failed - indicates that watchdog is required by configuration but not available or failed """ @classmethod def from_api_response(cls, member, json): is_master = json['role'] == 'master' + timeline = json.get('timeline', 0) wal = not is_master and max(json['xlog'].get('received_location', 0), json['xlog'].get('replayed_location', 0)) - return cls(member, True, not is_master, wal, json.get('tags', {}), json.get('watchdog_failed', False)) + return cls(member, True, not is_master, timeline, wal, json.get('tags', {}), json.get('watchdog_failed', False)) @classmethod def unknown(cls, member): - return cls(member, False, None, 0, {}, False) + return cls(member, False, None, 0, 0, {}, False) def failover_limitation(self): """Returns reason why this node can't promote or None if everything is ok.""" @@ -92,6 +95,9 @@ class Ha(object): def is_paused(self): return self.check_mode('pause') + def check_timeline(self): + return self.check_mode('check_timeline') + def get_standby_cluster_config(self): if self.cluster and self.cluster.config and self.cluster.config.modify_index: config = self.cluster.config.data @@ -543,15 +549,23 @@ class Ha(object): :returns True when node is lagging """ lag = (self.cluster.last_leader_operation or 0) - wal_position - return lag > self.state_handler.config.get('maximum_lag_on_failover', 0) + return lag > self.patroni.config.get('maximum_lag_on_failover', 0) def _is_healthiest_node(self, members, check_replication_lag=True): """This method tries to determine whether I am healthy enough to became a new leader candidate or not.""" _, my_wal_position = self.state_handler.timeline_wal_position() if check_replication_lag and self.is_lagging(my_wal_position): + logger.info('My wal position exceeds maximum replication lag') return False # Too far behind last reported wal position on master + if not self.is_standby_cluster() and self.check_timeline(): + cluster_timeline = self.cluster.timeline + my_timeline = self.state_handler.replica_cached_timeline(cluster_timeline) + if my_timeline < cluster_timeline: + logger.info('My timeline %s is behind last known cluster timeline %s', my_timeline, cluster_timeline) + return False + # Prepare list of nodes to run check against members = [m for m in members if m.name != self.state_handler.name and not m.nofailover and m.api_url] @@ -562,11 +576,13 @@ class Ha(object): logger.warning('Master (%s) is still alive', st.member.name) return False if my_wal_position < st.wal_position: + logger.info('Wal position of %s is ahead of my wal position', st.member.name) return False return True def is_failover_possible(self, members): ret = False + cluster_timeline = self.cluster.timeline members = [m for m in members if m.name != self.state_handler.name and not m.nofailover and m.api_url] if members: for st in self.fetch_nodes_statuses(members): @@ -575,6 +591,9 @@ class Ha(object): logger.info('Member %s is %s', st.member.name, not_allowed_reason) elif self.is_lagging(st.wal_position): logger.info('Member %s exceeds maximum replication lag', st.member.name) + elif self.check_timeline() and (not st.timeline or st.timeline < cluster_timeline): + logger.info('Timeline %s of member %s is behind the cluster timeline %s', + st.timeline, st.member.name, cluster_timeline) else: ret = True else: diff --git a/tests/test_api.py b/tests/test_api.py index b60a2cc0..0f41516b 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -73,7 +73,7 @@ class MockHa(object): @staticmethod def fetch_nodes_statuses(members): - return [_MemberStatus(None, True, None, None, {}, False)] + return [_MemberStatus(None, True, None, 0, None, {}, False)] @staticmethod def schedule_future_restart(data): diff --git a/tests/test_ha.py b/tests/test_ha.py index 9e5b77bf..faf49034 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -28,8 +28,10 @@ def false(*args, **kwargs): def get_cluster(initialize, leader, members, failover, sync, cluster_config=None): - history = TimelineHistory(1, [(1, 67197376, 'no recovery target specified', datetime.datetime.now().isoformat())]) - cluster_config = cluster_config or ClusterConfig(1, {1: 2}, 1) + t = datetime.datetime.now().isoformat() + history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '"]]', + [(1, 67197376, 'no recovery target specified', t)]) + cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True}, 1) return Cluster(initialize, cluster_config, leader, 10, members, failover, sync, history) @@ -72,12 +74,13 @@ def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None): ) -def get_node_status(reachable=True, in_recovery=True, wal_position=10, nofailover=False, watchdog_failed=False): +def get_node_status(reachable=True, in_recovery=True, timeline=2, + wal_position=10, nofailover=False, watchdog_failed=False): def fetch_node_status(e): tags = {} if nofailover: tags['nofailover'] = True - return _MemberStatus(e, reachable, in_recovery, wal_position, tags, watchdog_failed) + return _MemberStatus(e, reachable, in_recovery, timeline, wal_position, tags, watchdog_failed) return fetch_node_status @@ -115,6 +118,7 @@ zookeeper: sys.argv = sys.argv[:1] self.config = Config() + self.config.set_dynamic_configuration({'maximum_lag_on_failover': 5}) self.postgresql = p self.dcs = d self.api = Mock() @@ -147,6 +151,7 @@ def run_async(self, func, args=()): @patch.object(Postgresql, 'query', Mock()) @patch.object(Postgresql, 'checkpoint', Mock()) @patch.object(Postgresql, 'cancellable_subprocess_call', Mock(return_value=0)) +@patch.object(Postgresql, '_get_local_timeline_lsn_from_replication_connection', Mock(return_value=[2, 10])) @patch.object(etcd.Client, 'write', etcd_write) @patch.object(etcd.Client, 'read', etcd_read) @patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException)) @@ -166,7 +171,6 @@ class TestHa(unittest.TestCase): mock_machines.__get__ = Mock(return_value=['http://remotehost:2379']) self.p = Postgresql({'name': 'postgresql0', 'scope': 'dummy', 'listen': '127.0.0.1:5432', 'data_dir': 'data/postgresql0', 'retry_timeout': 10, - 'maximum_lag_on_failover': 5, 'authentication': {'superuser': {'username': 'foo', 'password': 'bar'}, 'replication': {'username': '', 'password': ''}}, 'parameters': {'wal_level': 'hot_standby', 'max_replication_slots': 5, 'foo': 'bar', @@ -461,6 +465,8 @@ class TestHa(unittest.TestCase): self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock') self.ha.fetch_node_status = get_node_status(watchdog_failed=True) self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock') + self.ha.fetch_node_status = get_node_status(timeline=1) + self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock') self.ha.fetch_node_status = get_node_status(wal_position=1) self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock') # manual failover from the previous leader to us won't happen if we hold the nofailover flag @@ -572,6 +578,8 @@ class TestHa(unittest.TestCase): self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) with patch('patroni.postgresql.Postgresql.timeline_wal_position', return_value=(1, 1)): self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) + with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=1): + self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.patroni.nofailover = True self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members)) self.ha.patroni.nofailover = False