mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Optionally consider node not healthy if it is not on the latest timeline (#892)
The latest timeline is calculated from the `/history` key in DCS. In case there is no such key or it contains some garbage we consider the node healthy. Closes https://github.com/zalando/patroni/issues/890
This commit is contained in:
committed by
GitHub
parent
cf34fb3934
commit
71dae6a905
@@ -76,6 +76,7 @@ Also, the following Patroni configuration options can be changed only dynamicall
|
||||
- loop_wait: 10
|
||||
- retry_timeouts: 10
|
||||
- maximum_lag_on_failover: 1048576
|
||||
- check_timeline: false
|
||||
- postgresql.use_slots: true
|
||||
|
||||
Upon changing these options, Patroni will read the relevant section of the configuration stored in DCS and change its
|
||||
|
||||
@@ -13,6 +13,8 @@ In asynchronous mode the cluster is allowed to lose some committed transactions
|
||||
|
||||
The amount of transactions that can be lost is controlled via ``maximum_lag_on_failover`` parameter. Because the primary transaction log position is not sampled in real time, in reality the amount of lost data on failover is worst case bounded by ``maximum_lag_on_failover`` bytes of transaction log plus the amount that is written in the last ``ttl`` seconds (``loop_wait``/2 seconds in the average case). However typical steady state replication delay is well under a second.
|
||||
|
||||
By default, when running leader elections, Patroni does not take into account the current timeline of replicas, what in some cases could be undesirable behavior. You can prevent the node not having the same timeline as a former master become the new leader by changing the value of ``check_timeline`` parameter to ``true``.
|
||||
|
||||
PostgreSQL synchronous replication
|
||||
----------------------------------
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ class Config(object):
|
||||
__DEFAULT_CONFIG = {
|
||||
'ttl': 30, 'loop_wait': 10, 'retry_timeout': 10,
|
||||
'maximum_lag_on_failover': 1048576,
|
||||
'check_timeline': False,
|
||||
'master_start_timeout': 300,
|
||||
'synchronous_mode': False,
|
||||
'synchronous_mode_strict': False,
|
||||
@@ -355,7 +356,7 @@ class Config(object):
|
||||
'scope',
|
||||
'retry_timeout',
|
||||
'synchronous_mode',
|
||||
'maximum_lag_on_failover'
|
||||
'synchronous_mode_strict',
|
||||
)
|
||||
|
||||
pg_config.update({p: config[p] for p in updated_fields if p in config})
|
||||
|
||||
@@ -368,7 +368,7 @@ class SyncState(namedtuple('SyncState', 'index,leader,sync_standby')):
|
||||
return name is not None and name in (self.leader, self.sync_standby)
|
||||
|
||||
|
||||
class TimelineHistory(namedtuple('TimelineHistory', 'index,lines')):
|
||||
class TimelineHistory(namedtuple('TimelineHistory', 'index,value,lines')):
|
||||
"""Object representing timeline history file"""
|
||||
|
||||
@staticmethod
|
||||
@@ -384,7 +384,7 @@ class TimelineHistory(namedtuple('TimelineHistory', 'index,lines')):
|
||||
lines = None
|
||||
if not isinstance(lines, list):
|
||||
lines = []
|
||||
return TimelineHistory(index, lines)
|
||||
return TimelineHistory(index, value, lines)
|
||||
|
||||
|
||||
class Cluster(namedtuple('Cluster', 'initialize,config,leader,last_leader_operation,members,failover,sync,history')):
|
||||
@@ -484,6 +484,26 @@ class Cluster(namedtuple('Cluster', 'initialize,config,leader,last_leader_operat
|
||||
slots = self.get_replication_slots(name, 'master').values()
|
||||
return any(v for v in slots if v.get("type") == "logical")
|
||||
|
||||
@property
|
||||
def timeline(self):
|
||||
"""
|
||||
>>> Cluster(0, 0, 0, 0, 0, 0, 0, 0).timeline
|
||||
0
|
||||
>>> Cluster(0, 0, 0, 0, 0, 0, 0, TimelineHistory.from_node(1, '[]')).timeline
|
||||
1
|
||||
>>> Cluster(0, 0, 0, 0, 0, 0, 0, TimelineHistory.from_node(1, '[["a"]]')).timeline
|
||||
0
|
||||
"""
|
||||
if self.history:
|
||||
if self.history.lines:
|
||||
try:
|
||||
return int(self.history.lines[-1][0]) + 1
|
||||
except Exception:
|
||||
logger.error('Failed to parse cluster history from DCS: %s', self.history.lines)
|
||||
elif self.history.value == '[]':
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AbstractDCS(object):
|
||||
|
||||
@@ -20,25 +20,28 @@ from threading import RLock
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _MemberStatus(namedtuple('_MemberStatus', 'member,reachable,in_recovery,wal_position,tags,watchdog_failed')):
|
||||
class _MemberStatus(namedtuple('_MemberStatus', ['member', 'reachable', 'in_recovery', 'timeline',
|
||||
'wal_position', 'tags', 'watchdog_failed'])):
|
||||
"""Node status distilled from API response:
|
||||
|
||||
member - dcs.Member object of the node
|
||||
reachable - `!False` if the node is not reachable or is not responding with correct JSON
|
||||
in_recovery - `!True` if pg_is_in_recovery() == true
|
||||
wal_position - value of `replayed_location` or `location` from JSON, dependin on its role.
|
||||
timeline - timeline value from JSON
|
||||
wal_position - maximum value of `replayed_location` or `received_location` from JSON
|
||||
tags - dictionary with values of different tags (i.e. nofailover)
|
||||
watchdog_failed - indicates that watchdog is required by configuration but not available or failed
|
||||
"""
|
||||
@classmethod
|
||||
def from_api_response(cls, member, json):
|
||||
is_master = json['role'] == 'master'
|
||||
timeline = json.get('timeline', 0)
|
||||
wal = not is_master and max(json['xlog'].get('received_location', 0), json['xlog'].get('replayed_location', 0))
|
||||
return cls(member, True, not is_master, wal, json.get('tags', {}), json.get('watchdog_failed', False))
|
||||
return cls(member, True, not is_master, timeline, wal, json.get('tags', {}), json.get('watchdog_failed', False))
|
||||
|
||||
@classmethod
|
||||
def unknown(cls, member):
|
||||
return cls(member, False, None, 0, {}, False)
|
||||
return cls(member, False, None, 0, 0, {}, False)
|
||||
|
||||
def failover_limitation(self):
|
||||
"""Returns reason why this node can't promote or None if everything is ok."""
|
||||
@@ -92,6 +95,9 @@ class Ha(object):
|
||||
def is_paused(self):
|
||||
return self.check_mode('pause')
|
||||
|
||||
def check_timeline(self):
|
||||
return self.check_mode('check_timeline')
|
||||
|
||||
def get_standby_cluster_config(self):
|
||||
if self.cluster and self.cluster.config and self.cluster.config.modify_index:
|
||||
config = self.cluster.config.data
|
||||
@@ -543,15 +549,23 @@ class Ha(object):
|
||||
:returns True when node is lagging
|
||||
"""
|
||||
lag = (self.cluster.last_leader_operation or 0) - wal_position
|
||||
return lag > self.state_handler.config.get('maximum_lag_on_failover', 0)
|
||||
return lag > self.patroni.config.get('maximum_lag_on_failover', 0)
|
||||
|
||||
def _is_healthiest_node(self, members, check_replication_lag=True):
|
||||
"""This method tries to determine whether I am healthy enough to became a new leader candidate or not."""
|
||||
|
||||
_, my_wal_position = self.state_handler.timeline_wal_position()
|
||||
if check_replication_lag and self.is_lagging(my_wal_position):
|
||||
logger.info('My wal position exceeds maximum replication lag')
|
||||
return False # Too far behind last reported wal position on master
|
||||
|
||||
if not self.is_standby_cluster() and self.check_timeline():
|
||||
cluster_timeline = self.cluster.timeline
|
||||
my_timeline = self.state_handler.replica_cached_timeline(cluster_timeline)
|
||||
if my_timeline < cluster_timeline:
|
||||
logger.info('My timeline %s is behind last known cluster timeline %s', my_timeline, cluster_timeline)
|
||||
return False
|
||||
|
||||
# Prepare list of nodes to run check against
|
||||
members = [m for m in members if m.name != self.state_handler.name and not m.nofailover and m.api_url]
|
||||
|
||||
@@ -562,11 +576,13 @@ class Ha(object):
|
||||
logger.warning('Master (%s) is still alive', st.member.name)
|
||||
return False
|
||||
if my_wal_position < st.wal_position:
|
||||
logger.info('Wal position of %s is ahead of my wal position', st.member.name)
|
||||
return False
|
||||
return True
|
||||
|
||||
def is_failover_possible(self, members):
|
||||
ret = False
|
||||
cluster_timeline = self.cluster.timeline
|
||||
members = [m for m in members if m.name != self.state_handler.name and not m.nofailover and m.api_url]
|
||||
if members:
|
||||
for st in self.fetch_nodes_statuses(members):
|
||||
@@ -575,6 +591,9 @@ class Ha(object):
|
||||
logger.info('Member %s is %s', st.member.name, not_allowed_reason)
|
||||
elif self.is_lagging(st.wal_position):
|
||||
logger.info('Member %s exceeds maximum replication lag', st.member.name)
|
||||
elif self.check_timeline() and (not st.timeline or st.timeline < cluster_timeline):
|
||||
logger.info('Timeline %s of member %s is behind the cluster timeline %s',
|
||||
st.timeline, st.member.name, cluster_timeline)
|
||||
else:
|
||||
ret = True
|
||||
else:
|
||||
|
||||
@@ -73,7 +73,7 @@ class MockHa(object):
|
||||
|
||||
@staticmethod
|
||||
def fetch_nodes_statuses(members):
|
||||
return [_MemberStatus(None, True, None, None, {}, False)]
|
||||
return [_MemberStatus(None, True, None, 0, None, {}, False)]
|
||||
|
||||
@staticmethod
|
||||
def schedule_future_restart(data):
|
||||
|
||||
@@ -28,8 +28,10 @@ def false(*args, **kwargs):
|
||||
|
||||
|
||||
def get_cluster(initialize, leader, members, failover, sync, cluster_config=None):
|
||||
history = TimelineHistory(1, [(1, 67197376, 'no recovery target specified', datetime.datetime.now().isoformat())])
|
||||
cluster_config = cluster_config or ClusterConfig(1, {1: 2}, 1)
|
||||
t = datetime.datetime.now().isoformat()
|
||||
history = TimelineHistory(1, '[[1,67197376,"no recovery target specified","' + t + '"]]',
|
||||
[(1, 67197376, 'no recovery target specified', t)])
|
||||
cluster_config = cluster_config or ClusterConfig(1, {'check_timeline': True}, 1)
|
||||
return Cluster(initialize, cluster_config, leader, 10, members, failover, sync, history)
|
||||
|
||||
|
||||
@@ -72,12 +74,13 @@ def get_standby_cluster_initialized_with_only_leader(failover=None, sync=None):
|
||||
)
|
||||
|
||||
|
||||
def get_node_status(reachable=True, in_recovery=True, wal_position=10, nofailover=False, watchdog_failed=False):
|
||||
def get_node_status(reachable=True, in_recovery=True, timeline=2,
|
||||
wal_position=10, nofailover=False, watchdog_failed=False):
|
||||
def fetch_node_status(e):
|
||||
tags = {}
|
||||
if nofailover:
|
||||
tags['nofailover'] = True
|
||||
return _MemberStatus(e, reachable, in_recovery, wal_position, tags, watchdog_failed)
|
||||
return _MemberStatus(e, reachable, in_recovery, timeline, wal_position, tags, watchdog_failed)
|
||||
return fetch_node_status
|
||||
|
||||
|
||||
@@ -115,6 +118,7 @@ zookeeper:
|
||||
sys.argv = sys.argv[:1]
|
||||
|
||||
self.config = Config()
|
||||
self.config.set_dynamic_configuration({'maximum_lag_on_failover': 5})
|
||||
self.postgresql = p
|
||||
self.dcs = d
|
||||
self.api = Mock()
|
||||
@@ -147,6 +151,7 @@ def run_async(self, func, args=()):
|
||||
@patch.object(Postgresql, 'query', Mock())
|
||||
@patch.object(Postgresql, 'checkpoint', Mock())
|
||||
@patch.object(Postgresql, 'cancellable_subprocess_call', Mock(return_value=0))
|
||||
@patch.object(Postgresql, '_get_local_timeline_lsn_from_replication_connection', Mock(return_value=[2, 10]))
|
||||
@patch.object(etcd.Client, 'write', etcd_write)
|
||||
@patch.object(etcd.Client, 'read', etcd_read)
|
||||
@patch.object(etcd.Client, 'delete', Mock(side_effect=etcd.EtcdException))
|
||||
@@ -166,7 +171,6 @@ class TestHa(unittest.TestCase):
|
||||
mock_machines.__get__ = Mock(return_value=['http://remotehost:2379'])
|
||||
self.p = Postgresql({'name': 'postgresql0', 'scope': 'dummy', 'listen': '127.0.0.1:5432',
|
||||
'data_dir': 'data/postgresql0', 'retry_timeout': 10,
|
||||
'maximum_lag_on_failover': 5,
|
||||
'authentication': {'superuser': {'username': 'foo', 'password': 'bar'},
|
||||
'replication': {'username': '', 'password': ''}},
|
||||
'parameters': {'wal_level': 'hot_standby', 'max_replication_slots': 5, 'foo': 'bar',
|
||||
@@ -461,6 +465,8 @@ class TestHa(unittest.TestCase):
|
||||
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
|
||||
self.ha.fetch_node_status = get_node_status(watchdog_failed=True)
|
||||
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
|
||||
self.ha.fetch_node_status = get_node_status(timeline=1)
|
||||
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
|
||||
self.ha.fetch_node_status = get_node_status(wal_position=1)
|
||||
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
|
||||
# manual failover from the previous leader to us won't happen if we hold the nofailover flag
|
||||
@@ -572,6 +578,8 @@ class TestHa(unittest.TestCase):
|
||||
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
with patch('patroni.postgresql.Postgresql.timeline_wal_position', return_value=(1, 1)):
|
||||
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=1):
|
||||
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
self.ha.patroni.nofailover = True
|
||||
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
self.ha.patroni.nofailover = False
|
||||
|
||||
Reference in New Issue
Block a user