Smart pg_rewind (#417)

Previously we were running pg_rewind only in limited amount of cases:
 * when we knew postgres was a master (no recovery.conf in data dir)
 * when we were doing a manual switchover to a specific node (no
   guaranty that this node is the most up-to-date)
 * when a given node has nofailover tag (it could be ahead of new master)

This approach was kind of working in most of the cases, but sometimes we
were executing pg_rewind when it was not necessary and in some other
cases we were not executing it although it was needed.

The main idea of this PR is first try to figure out that we really need
to run pg_rewind by analyzing timelineid, LSN and history file on master
and replica and run it only if it's needed.
This commit is contained in:
Alexander Kukushkin
2017-05-19 16:32:06 +02:00
committed by GitHub
parent fed7a24889
commit 37c1552c0a
7 changed files with 339 additions and 279 deletions

View File

@@ -110,5 +110,7 @@ script:
set +e
after_success:
# before_cache is executed earlier than after_success, so we need to restore one of virtualenv directories
- fpv=$(basename $(readlink $HOME/virtualenv/python3.5)) && mv $HOME/mycache/${fpv} $HOME/virtualenv/${fpv}
- coveralls
- if [[ $TEST_SUITE != "behave" ]]; then python-codacy-coverage -r coverage.xml; fi

View File

@@ -34,7 +34,8 @@ Feature: basic replication
And postgres1 role is the primary after 10 seconds
Scenario: check rejoin of the former master with pg_rewind
Given I start postgres0
Given I add the table splitbrain to postgres0
And I start postgres0
Then postgres0 role is the secondary after 20 seconds
When I add the table buz to postgres1
Then table buz is present on postgres0 after 20 seconds

View File

@@ -118,7 +118,10 @@ class PatroniController(AbstractController):
super(PatroniController, self).stop(kill, timeout)
def _is_accessible(self):
return self.query("SELECT 1", fail_ok=True) is not None
cursor = self.query("SELECT 1", fail_ok=True)
if cursor is not None:
cursor.execute("SET synchronous_commit TO 'local'")
return True
def _make_patroni_test_config(self, name, tags):
patroni_config_name = self.PATRONI_CONFIG.format(name)

View File

@@ -131,7 +131,7 @@ class Ha(object):
logger.info('bootstrapped %s', msg)
cluster = self.dcs.get_cluster()
node_to_follow = self._get_node_to_follow(cluster)
return self.state_handler.follow(node_to_follow, cluster.leader, True)
return self.state_handler.follow(node_to_follow)
else:
logger.error('failed to bootstrap %s', msg)
self.state_handler.remove_data_directory()
@@ -171,6 +171,12 @@ class Ha(object):
return 'trying to ' + msg
return 'waiting for leader to bootstrap'
def _handle_rewind(self):
if self.state_handler.rewind_needed_and_possible(self.cluster.leader):
self._async_executor.schedule('running pg_rewind from ' + self.cluster.leader.name)
self._async_executor.run_async(self.state_handler.rewind, (self.cluster.leader,))
return True
def recover(self):
if self.has_lock() and self.update_lock():
timeout = self.patroni.config['master_start_timeout']
@@ -184,9 +190,23 @@ class Ha(object):
else:
timeout = None
self.load_cluster_from_dcs()
if self.has_lock():
msg = "starting as readonly because i had the session lock"
node_to_follow = None
else:
if not self.state_handler.rewind_executed:
self.state_handler.trigger_check_diverged_lsn()
if self._handle_rewind():
return self._async_executor.scheduled_action
msg = "starting as a secondary"
node_to_follow = self._get_node_to_follow(self.cluster)
self.recovering = True
return self.follow("starting as readonly because i had the session lock",
"starting as a secondary", True, True, None, timeout)
self._async_executor.schedule('restarting after failure')
self._async_executor.run_async(self.state_handler.follow, (node_to_follow, timeout))
return msg
def _get_node_to_follow(self, cluster):
# determine the node to follow. If replicatefrom tag is set,
@@ -198,29 +218,33 @@ class Ha(object):
return node_to_follow if node_to_follow and node_to_follow.name != self.state_handler.name else None
def follow(self, demote_reason, follow_reason, refresh=True, recovery=False, need_rewind=None, timeout=None):
def follow(self, demote_reason, follow_reason, refresh=True):
if refresh:
self.load_cluster_from_dcs()
if recovery:
ret = demote_reason if self.has_lock() else follow_reason
else:
is_leader = self.state_handler.is_leader()
ret = demote_reason if is_leader else follow_reason
is_leader = self.state_handler.is_leader()
node_to_follow = self._get_node_to_follow(self.cluster)
if self.is_paused() and not (self.state_handler.need_rewind and self.state_handler.can_rewind):
self.state_handler.set_role('master' if is_leader else 'replica')
if is_leader:
return 'continue to run as master without lock'
elif not node_to_follow:
return 'no action'
if self.is_paused():
if not (self.state_handler.need_rewind and self.state_handler.can_rewind) or self.cluster.is_unlocked():
self.state_handler.set_role('master' if is_leader else 'replica')
if is_leader:
return 'continue to run as master without lock'
elif not node_to_follow:
return 'no action'
elif is_leader:
self.demote('immediate')
return demote_reason
self.state_handler.follow(node_to_follow, self.cluster.leader, recovery,
self._async_executor, need_rewind, timeout)
if self._handle_rewind():
return self._async_executor.scheduled_action
return ret
if not self.state_handler.check_recovery_conf(node_to_follow):
self._async_executor.schedule('changing primary_conninfo and restarting')
self._async_executor.run_async(self.state_handler.follow, (node_to_follow,))
return follow_reason
def is_synchronous_mode(self):
return bool(self.cluster and self.cluster.config and self.cluster.config.data.get('synchronous_mode'))
@@ -497,6 +521,7 @@ class Ha(object):
without regard for data durability. May only be called synchronously.
"""
assert mode in ['offline', 'graceful', 'immediate']
self.state_handler.trigger_check_diverged_lsn()
if mode != 'offline':
if mode == 'immediate':
self.state_handler.stop('immediate', checkpoint=False)
@@ -510,16 +535,18 @@ class Ha(object):
if mode == 'immediate':
# We will try to start up as a standby now. If no one takes the leader lock before we finish
# recovery we will try to promote ourselves.
self._async_executor.schedule('waiting for failover to complete')
self._async_executor.run_async(self.state_handler.follow,
(node_to_follow, cluster.leader, True, None, True))
self._async_executor.schedule('starting after demotion')
self._async_executor.run_async(self.state_handler.follow, (node_to_follow,))
else:
return self.state_handler.follow(node_to_follow, cluster.leader, recovery=True, need_rewind=True)
logger.info('cluster.leader = %s', cluster.leader)
if self.state_handler.rewind_needed_and_possible(cluster.leader):
return False # do not start postgres, but run pg_rewind on the next iteration
return self.state_handler.follow(node_to_follow)
else:
# Need to become unavailable as soon as possible, so initiate a stop here. However as we can't release
# the leader key we don't care about confirming the shutdown quickly and can use a regular stop.
self.state_handler.stop(checkpoint=False)
self.state_handler.follow(None, None, recovery=True)
self.state_handler.follow(None)
def should_run_scheduled_action(self, action_name, scheduled_at, cleanup_fn):
if scheduled_at and not self.is_paused():
@@ -614,17 +641,16 @@ class Ha(object):
else:
# when we are doing manual failover there is no guaranty that new leader is ahead of any other node
# node tagged as nofailover can be ahead of the new leader either, but it is always excluded from elections
need_rewind = bool(self.cluster.failover) or self.patroni.nofailover
if need_rewind:
check_diverged_lsn = bool(self.cluster.failover) or self.patroni.nofailover
if check_diverged_lsn:
self.state_handler.trigger_check_diverged_lsn()
time.sleep(2) # Give a time to somebody to take the leader lock
if self.patroni.nofailover:
return self.follow('demoting self because I am not allowed to become master',
'following a different leader because I am not allowed to promote',
need_rewind=need_rewind)
'following a different leader because I am not allowed to promote')
return self.follow('demoting self because i am not the healthiest node',
'following a different leader because i am not the healthiest node',
need_rewind=need_rewind)
'following a different leader because i am not the healthiest node')
def process_healthy_cluster(self):
if self.has_lock():
@@ -936,6 +962,8 @@ class Ha(object):
# asynchronous processes are running (should be always the case for the master)
if not self._async_executor.busy and not self.state_handler.is_starting():
if not self.state_handler.cb_called:
if not self.state_handler.is_leader():
self.state_handler.trigger_check_diverged_lsn()
self.state_handler.call_nowait(ACTION_ON_START)
self.state_handler.sync_replication_slots(self.cluster)
except DCSError:

View File

@@ -9,6 +9,7 @@ import tempfile
import time
from collections import defaultdict
from contextlib import contextmanager
from patroni import call_self
from patroni.callback_executor import CallbackExecutor
from patroni.exceptions import PostgresConnectionException, PostgresException
@@ -29,6 +30,8 @@ STATE_REJECT = 'rejecting connections'
STATE_NO_RESPONSE = 'not responding'
STATE_UNKNOWN = 'unknown'
REWIND_STATUS = type('Enum', (), {'INITIAL': 0, 'CHECK': 1, 'NEED': 2, 'NOT_NEED': 3, 'SUCCESS': 4, 'FAILED': 5})
def slot_name_from_member_name(member_name):
"""Translate member name to valid PostgreSQL slot name.
@@ -99,7 +102,7 @@ class Postgresql(object):
self._replication = config['authentication']['replication']
self.resolve_connection_addresses()
self._need_rewind = False
self._rewind_state = REWIND_STATUS.INITIAL
self._use_slots = config.get('use_slots', True)
self._schedule_load_slots = self.use_slots
@@ -296,6 +299,11 @@ class Postgresql(object):
def pending_restart(self):
return self._pending_restart
@staticmethod
def configuration_allows_rewind(data):
return data.get('Current wal_log_hints setting', 'off') == 'on' \
or data.get('Data page checksum version', '0') != '0'
@property
def can_rewind(self):
""" check if pg_rewind executable is there and that pg_controldata indicates
@@ -312,9 +320,7 @@ class Postgresql(object):
return False
except OSError:
return False
# check if the cluster's configuration permits pg_rewind
data = self.controldata()
return data.get('wal_log_hints setting', 'off') == 'on' or data.get('Data page checksum version', '0') != '0'
return self.configuration_allows_rewind(self.controldata())
@property
def sysid(self):
@@ -751,15 +757,13 @@ class Postgresql(object):
for p in ['connect_timeout', 'options']:
connect_kwargs.pop(p, None)
try:
with psycopg2.connect(**connect_kwargs) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SET statement_timeout = 0")
if check_not_is_in_recovery:
cur.execute('SELECT pg_is_in_recovery()')
if cur.fetchone()[0]:
return 'is_in_recovery=true'
return cur.execute('CHECKPOINT')
with self._get_connection_cursor(**connect_kwargs) as cur:
cur.execute("SET statement_timeout = 0")
if check_not_is_in_recovery:
cur.execute('SELECT pg_is_in_recovery()')
if cur.fetchone()[0]:
return 'is_in_recovery=true'
return cur.execute('CHECKPOINT')
except psycopg2.Error:
logging.exception('Exception during CHECKPOINT')
return 'not accessible or not healty'
@@ -882,14 +886,17 @@ class Postgresql(object):
f.write('\n{}\n'.format('\n'.join(config)))
def primary_conninfo(self, member):
if not (member and member.conn_url):
if not (member and member.conn_url) or member.name == self.name:
return None
r = member.conn_kwargs(self._replication)
r.update({'application_name': self.name, 'sslmode': 'prefer', 'sslcompression': '1'})
keywords = 'user password host port sslmode sslcompression application_name'.split()
return ' '.join('{0}={{{0}}}'.format(kw) for kw in keywords).format(**r)
def check_recovery_conf(self, primary_conninfo):
def check_recovery_conf(self, member):
# TODO: recovery.conf could be stale, would be nice to detect that.
primary_conninfo = self.primary_conninfo(member)
if not os.path.isfile(self._recovery_conf):
return False
@@ -910,7 +917,7 @@ class Postgresql(object):
if name not in ('standby_mode', 'recovery_target_timeline', 'primary_conninfo', 'primary_slot_name'):
f.write("{0} = '{1}'\n".format(name, value))
def rewind(self, r):
def pg_rewind(self, r):
# prepare pg_rewind connection
env = self.write_pgpass(r)
dsn_attrs = [
@@ -937,139 +944,167 @@ class Postgresql(object):
# Don't try to call pg_controldata during backup restore
if self._version_file_exists() and self.state != 'creating replica':
try:
data = subprocess.check_output([self._pgcommand('pg_controldata'), self._data_dir])
data = subprocess.check_output([self._pgcommand('pg_controldata'), self._data_dir],
env={'LANG': 'C', 'LC_ALL': 'C', 'PATH': os.environ['PATH']})
if data:
data = data.decode('utf-8').splitlines()
result = {l.split(':')[0].replace('Current ', '', 1): l.split(':')[1].strip() for l in data if l}
result = {l.split(':', 1)[0]: l.split(':', 1)[1].strip() for l in data if l}
except subprocess.CalledProcessError:
logger.exception("Error when calling pg_controldata")
return result
def read_postmaster_opts(self):
""" returns the list of option names/values from postgres.opts, Empty dict if read failed or no file """
result = {}
try:
with open(os.path.join(self._data_dir, "postmaster.opts")) as f:
data = f.read()
opts = [opt.strip('"\n') for opt in data.split(' "')]
for opt in opts:
if '=' in opt and opt.startswith('--'):
name, val = opt.split('=', 1)
name = name.strip('-')
result[name] = val
except IOError:
logger.exception('Error when reading postmaster.opts')
return result
def single_user_mode(self, command=None, options=None):
""" run a given command in a single-user mode. If the command is empty - then just start and stop """
cmd = [self._pgcommand('postgres'), '--single', '-D', self._data_dir]
for opt, val in sorted((options or {}).items()):
cmd.extend(['-c', '{0}={1}'.format(opt, val)])
# need a database name to connect
cmd.append(self._database)
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT)
if p:
if command:
p.communicate('{0}\n'.format(command))
p.stdin.close()
return p.wait()
return 1
def cleanup_archive_status(self):
status_dir = os.path.join(self._data_dir, 'pg_xlog', 'archive_status')
try:
for f in os.listdir(status_dir):
path = os.path.join(status_dir, f)
try:
if os.path.islink(path):
os.unlink(path)
elif os.path.isfile(path):
os.remove(path)
except OSError:
logger.exception("Unable to remove %s", path)
except OSError:
logger.exception("Unable to list %s", status_dir)
@property
def need_rewind(self):
return self._need_rewind
return self._rewind_state in (REWIND_STATUS.CHECK, REWIND_STATUS.NEED)
def follow(self, member, leader, recovery=False, async_executor=None, need_rewind=None, timeout=None):
if need_rewind is not None:
self._need_rewind = need_rewind
@staticmethod
@contextmanager
def _get_connection_cursor(**kwargs):
with psycopg2.connect(**kwargs) as conn:
conn.autocommit = True
with conn.cursor() as cur:
yield cur
primary_conninfo = self.primary_conninfo(member)
@contextmanager
def _get_replication_connection_cursor(self, host='localhost', port=5432, **kwargs):
with self._get_connection_cursor(host=host, port=int(port), database=self._database, replication=1,
user=self._replication['username'], password=self._replication['password'],
connect_timeout=3, options='-c statement_timeout=2000') as cur:
yield cur
if self.check_recovery_conf(primary_conninfo) and not recovery:
return True
def check_leader_is_not_in_recovery(self, **kwargs):
try:
with self._get_connection_cursor(connect_timeout=3, options='-c statement_timeout=2000', **kwargs) as cur:
cur.execute('SELECT pg_is_in_recovery()')
if not cur.fetchone()[0]:
return True
logger.info('Leader is still in_recovery and therefore can\'t be used for rewind')
except Exception:
return logger.exception('Exception when working with leader')
if async_executor:
async_executor.schedule('changing primary_conninfo and restarting')
async_executor.run_async(self._do_follow, (primary_conninfo, leader, recovery, timeout))
def _get_local_timeline_lsn(self):
timeline = lsn = None
if self.is_running(): # if postgres is running - get timeline and lsn from replication connection
try:
with self._get_replication_connection_cursor(**self._local_address) as cur:
cur.execute('IDENTIFY_SYSTEM')
timeline, lsn = cur.fetchone()[1:3]
except Exception:
logger.exception('Can not fetch local timeline and lsn from replication connection')
else: # otherwise analyze pg_controldata output
data = self.controldata()
try:
if data.get('Database cluster state') == 'shut down':
lsn = data.get('Latest checkpoint location')
timeline = int(data.get("Latest checkpoint's TimeLineID"))
elif data.get('Database cluster state') == 'shut down in recovery':
lsn = data.get('Minimum recovery ending location')
timeline = int(data.get("Min recovery ending loc's timeline"))
except (TypeError, ValueError):
logger.exception('Failed to get local timeline and lsn from pg_controldata output')
logger.info('Local timeline=%s lsn=%s', timeline, lsn)
return timeline, lsn
def _check_timeline_and_lsn(self, leader):
local_timeline, local_lsn = self._get_local_timeline_lsn()
if local_timeline is None or local_lsn is None:
return
if not self.check_leader_is_not_in_recovery(**leader.conn_kwargs(self._superuser)):
return
history = need_rewind = None
try:
with self._get_replication_connection_cursor(**leader.conn_kwargs()) as cur:
cur.execute('IDENTIFY_SYSTEM')
master_timeline = cur.fetchone()[1]
logger.info('master_timeline=%s', master_timeline)
if local_timeline > master_timeline: # Not always supported by pg_rewind
need_rewind = True
elif master_timeline > 1:
cur.execute('TIMELINE_HISTORY %s', (master_timeline,))
history = bytes(cur.fetchone()[1]).decode('utf-8')
logger.info('master: history=%s', history)
else: # local_timeline == master_timeline == 1
need_rewind = False
except Exception:
return logger.exception('Exception when working with master via replication connection')
if history is not None:
def parse_lsn(lsn):
t = lsn.split('/')
return int(t[0], 16) * 0x100000000 + int(t[1], 16)
for line in history.split('\n'):
line = line.strip().split('\t')
if len(line) == 3:
try:
timeline = int(line[0])
if timeline == local_timeline:
try:
need_rewind = parse_lsn(local_lsn) >= parse_lsn(line[1])
except ValueError:
logger.exception('Exception when parsing lsn')
break
elif timeline > local_timeline:
break
except ValueError:
continue
self._rewind_state = need_rewind and REWIND_STATUS.NEED or REWIND_STATUS.NOT_NEED
def rewind(self, leader):
if self.is_running() and not self.stop(checkpoint=False):
return logger.warning('Can not run pg_rewind because postgres is still running')
# prepare pg_rewind connection
r = leader.conn_kwargs(self._superuser)
# first make sure that we are really trying to rewind
# from the master and run a checkpoint on it in order to
# make it store the new timeline (5540277D.8020309@iki.fi)
leader_status = self.checkpoint(r)
if leader_status:
return logger.warning('Can not use %s for rewind: %s', leader.name, leader_status)
if self.pg_rewind(r):
self._rewind_state = REWIND_STATUS.SUCCESS
elif not self.check_leader_is_not_in_recovery(**r):
logger.warning('Failed to rewind because master %s become unreachable', leader.name)
else:
return self._do_follow(primary_conninfo, leader, recovery, timeout)
logger.error('Failed to rewind from healty master: %s', leader.name)
def _do_follow(self, primary_conninfo, leader, recovery=False, timeout=None):
if self.config.get('remove_data_directory_on_rewind_failure', False):
logger.warning('remove_data_directory_on_rewind_failure is set. removing...')
self.remove_data_directory()
self._rewind_state = REWIND_STATUS.INITIAL
else:
self._rewind_state = REWIND_STATUS.FAILED
return False
def trigger_check_diverged_lsn(self):
if self.can_rewind and self._rewind_state != REWIND_STATUS.NEED:
self._rewind_state = REWIND_STATUS.CHECK
def rewind_needed_and_possible(self, leader):
if leader and leader.name != self.name and leader.conn_url and self._rewind_state == REWIND_STATUS.CHECK:
self._check_timeline_and_lsn(leader)
return leader and leader.conn_url and self._rewind_state == REWIND_STATUS.NEED
@property
def rewind_executed(self):
return self._rewind_state > REWIND_STATUS.NOT_NEED
def follow(self, member, timeout=None):
primary_conninfo = self.primary_conninfo(member)
change_role = self.role in ('master', 'demoted')
if leader and leader.name == self.name:
primary_conninfo = None
self._need_rewind = False
if self.is_running():
return
elif change_role:
self._need_rewind = True
if self._need_rewind and not self.can_rewind:
logger.warning("Data directory may be out of sync master, rewind may be needed.")
if self._need_rewind and leader and leader.conn_url and self.can_rewind:
logger.info("rewind flag is set")
if self.is_running() and not self.stop(checkpoint=False):
return logger.warning('Can not run pg_rewind because postgres is still running')
# prepare pg_rewind connection
r = leader.conn_kwargs(self._superuser)
# first make sure that we are really trying to rewind
# from the master and run a checkpoint on a t in order to
# make it store the new timeline (5540277D.8020309@iki.fi)
leader_status = self.checkpoint(r)
if leader_status:
return logger.warning('Can not use %s for rewind: %s', leader.name, leader_status)
# at present, pg_rewind only runs when the cluster is shut down cleanly
# and not shutdown in recovery. We have to remove the recovery.conf if present
# and start/shutdown in a single user mode to emulate this.
# XXX: if recovery.conf is linked, it will be written anew as a normal file.
if os.path.isfile(self._recovery_conf) or os.path.islink(self._recovery_conf):
os.unlink(self._recovery_conf)
# Archived segments might be useful to pg_rewind,
# clean the flags that tell we should remove them.
self.cleanup_archive_status()
# Start in a single user mode and stop to produce a clean shutdown
opts = self.read_postmaster_opts()
opts.update({'archive_mode': 'on', 'archive_command': 'false'})
self.single_user_mode(options=opts)
if self.rewind(r) or not self.config.get('remove_data_directory_on_rewind_failure', False):
self.write_recovery_conf(primary_conninfo)
self.start()
else:
logger.error('unable to rewind the former master')
self.remove_data_directory()
self._need_rewind = False
self.write_recovery_conf(primary_conninfo)
if self.is_running():
self.restart()
else:
self.write_recovery_conf(primary_conninfo)
if recovery:
self.start(timeout=timeout)
else:
self.restart()
self.set_role('replica')
self.start(timeout=timeout)
self.set_role('replica')
if change_role:
# TODO: postpone this until start completes, or maybe do even earlier
@@ -1104,8 +1139,8 @@ class Postgresql(object):
ret = self.pg_ctl('promote')
if ret:
self.set_role('master')
logger.info("cleared rewind flag after becoming the leader")
self._need_rewind = False
logger.info("cleared rewind state after becoming the leader")
self._rewind_state = REWIND_STATUS.INITIAL
self.call_nowait(ACTION_ON_ROLE_CHANGE)
return ret

View File

@@ -126,6 +126,7 @@ def run_async(self, func, args=()):
@patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=False))
@patch('patroni.async_executor.AsyncExecutor.run_async', run_async)
@patch('subprocess.call', Mock(return_value=0))
@patch('time.sleep', Mock())
class TestHa(unittest.TestCase):
@patch('socket.getaddrinfo', socket_getaddrinfo)
@@ -167,7 +168,6 @@ class TestHa(unittest.TestCase):
def test_recover_replica_failed(self):
self.p.controldata = lambda: {'Database cluster state': 'in production'}
self.p.is_healthy = false
self.p.is_running = false
self.p.follow = false
self.assertEquals(self.ha.run_cycle(), 'starting as a secondary')
@@ -175,7 +175,6 @@ class TestHa(unittest.TestCase):
def test_recover_master_failed(self):
self.p.follow = false
self.p.is_healthy = false
self.p.is_running = false
self.p.name = 'leader'
self.p.set_role('master')
@@ -183,6 +182,12 @@ class TestHa(unittest.TestCase):
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEquals(self.ha.run_cycle(), 'starting as readonly because i had the session lock')
@patch.object(Postgresql, 'rewind_needed_and_possible', Mock(return_value=True))
def test_recover_with_rewind(self):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEquals(self.ha.run_cycle(), 'running pg_rewind from leader')
@patch('sys.exit', return_value=1)
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
def test_sysid_no_match(self, exit_mock):
@@ -260,6 +265,13 @@ class TestHa(unittest.TestCase):
self.p.is_leader = false
self.assertEquals(self.ha.run_cycle(), 'PAUSE: no action')
@patch.object(Postgresql, 'rewind_needed_and_possible', Mock(return_value=True))
def test_follow_triggers_rewind(self):
self.p.is_leader = false
self.p.trigger_check_diverged_lsn()
self.ha.cluster = get_cluster_initialized_with_leader()
self.assertEquals(self.ha.run_cycle(), 'running pg_rewind from leader')
def test_no_etcd_connection_master_demote(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.assertEquals(self.ha.run_cycle(), 'demoted self because DCS is not accessible and i was a leader')
@@ -331,7 +343,6 @@ class TestHa(unittest.TestCase):
self.assertEquals(self.ha.run_cycle(), 'failed to update leader lock during restart')
@patch('requests.get', requests_get)
@patch('time.sleep', Mock())
def test_manual_failover_from_leader(self):
self.ha.fetch_node_status = get_node_status()
self.ha.has_lock = true
@@ -344,6 +355,8 @@ class TestHa(unittest.TestCase):
f = Failover(0, self.p.name, '', None)
self.ha.cluster = get_cluster_initialized_with_leader(f)
self.assertEquals(self.ha.run_cycle(), 'manual failover: demoting myself')
self.p.rewind_needed_and_possible = true
self.assertEquals(self.ha.run_cycle(), 'manual failover: demoting myself')
self.ha.fetch_node_status = get_node_status(nofailover=True)
self.assertEquals(self.ha.run_cycle(), 'no action. i am the leader with the lock')
self.ha.fetch_node_status = get_node_status(xlog_location=1)
@@ -384,7 +397,6 @@ class TestHa(unittest.TestCase):
self.assertEquals('PAUSE: no action. i am the leader with the lock', self.ha.run_cycle())
@patch('requests.get', requests_get)
@patch('time.sleep', Mock())
def test_manual_failover_process_no_leader(self):
self.p.is_leader = false
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', self.p.name, None))
@@ -409,7 +421,6 @@ class TestHa(unittest.TestCase):
self.ha.patroni.nofailover = True
self.assertEquals(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
@patch('time.sleep', Mock())
def test_manual_failover_process_no_leader_in_pause(self):
self.ha.is_paused = true
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
@@ -569,7 +580,6 @@ class TestHa(unittest.TestCase):
self.assertEquals(self.ha.run_cycle(), 'no action. i am a secondary and i am following a leader')
check_calls([(update_lock, False), (demote, False)])
@patch('time.sleep', Mock())
def test_manual_failover_while_starting(self):
self.ha.has_lock = true
self.p.check_for_startup = true
@@ -589,15 +599,13 @@ class TestHa(unittest.TestCase):
self.assertEquals(self.ha.run_cycle(), 'stopped PostgreSQL to fail over after a crash')
demote.assert_called_once()
@patch('time.sleep', Mock())
@patch('patroni.postgresql.Postgresql.follow')
def test_demote_immediate(self, follow):
self.ha.has_lock = true
self.e.get_cluster = Mock(return_value=get_cluster_initialized_without_leader())
self.ha.demote('immediate')
follow.assert_called_once_with(None, None, True, None, True)
follow.assert_called_once_with(None)
@patch('time.sleep', Mock())
def test_process_sync_replication(self):
self.ha.has_lock = true
mock_set_sync = self.p.set_synchronous_standby = Mock()
@@ -724,8 +732,7 @@ class TestHa(unittest.TestCase):
mock_promote.assert_called_once()
mock_write_sync.assert_called_once_with('other', None, index=0)
@patch('time.sleep')
def test_disable_sync_when_restarting(self, mock_sleep):
def test_disable_sync_when_restarting(self):
self.ha.is_synchronous_mode = true
self.p.name = 'other'
@@ -738,10 +745,10 @@ class TestHa(unittest.TestCase):
get_cluster_initialized_with_leader(sync=('leader', syncstandby))
for syncstandby in ['other', None]])
self.ha.restart({})
mock_restart.assert_called_once()
mock_sleep.assert_called()
with patch('time.sleep') as mock_sleep:
self.ha.restart({})
mock_restart.assert_called_once()
mock_sleep.assert_called()
# Restart is still called when DCS connection fails
mock_restart.reset_mock()

View File

@@ -43,6 +43,13 @@ class MockCursor(object):
('port', '5433', None, 'integer', 'postmaster'),
('listen_addresses', '*', None, 'string', 'postmaster'),
('autovacuum', 'on', None, 'bool', 'sighup')]
elif sql.startswith('IDENTIFY_SYSTEM'):
self.results = [('1', 2, '0/402EEC0', '')]
elif sql.startswith('TIMELINE_HISTORY '):
self.results = [('', b'x\t0/40159C0\tno recovery target specified\n\n' +
b'1\t0/40159C0\tno recovery target specified\n\n' +
b'2\t0/402DD98\tno recovery target specified\n\n' +
b'3\t0/403DD98\tno recovery target specified\n')]
else:
self.results = [(None, None, None, None, None, None, None, None, None, None)]
@@ -138,21 +145,10 @@ Data page checksum version: 0
"""
def postmaster_opts_string(*args, **kwargs):
return '/usr/local/pgsql/bin/postgres "-D" "data/postgresql0" "--listen_addresses=127.0.0.1" \
"--port=5432" "--hot_standby=on" "--wal_keep_segments=8" "--wal_level=hot_standby" \
"--archive_command=mkdir -p ../wal_archive && cp %p ../wal_archive/%f" "--wal_log_hints=on" \
"--max_wal_senders=5" "--archive_timeout=1800s" "--archive_mode=on" "--max_replication_slots=5"\n'
def psycopg2_connect(*args, **kwargs):
return MockConnect()
def fake_listdir(path):
return ["a", "b", "c"] if path.endswith('pg_xlog/archive_status') else []
@patch('subprocess.call', Mock(return_value=0))
@patch('psycopg2.connect', psycopg2_connect)
class TestPostgresql(unittest.TestCase):
@@ -293,39 +289,84 @@ class TestPostgresql(unittest.TestCase):
@patch('patroni.postgresql.Postgresql.write_pgpass', MagicMock(return_value=dict()))
def test_pg_rewind(self, mock_call):
r = {'user': '', 'host': '', 'port': '', 'database': '', 'password': ''}
self.assertTrue(self.p.rewind(r))
self.assertTrue(self.p.pg_rewind(r))
subprocess.call = mock_call
self.assertFalse(self.p.rewind(r))
self.assertFalse(self.p.pg_rewind(r))
@patch('os.unlink', Mock(return_value=True))
@patch('subprocess.check_output', Mock(return_value=0, side_effect=pg_controldata_string))
@patch.object(Postgresql, 'remove_data_directory', Mock(return_value=True))
@patch.object(Postgresql, 'single_user_mode', Mock(return_value=1))
@patch.object(Postgresql, 'write_pgpass', Mock(return_value={}))
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
def test_check_recovery_conf(self):
self.p.write_recovery_conf('foo')
self.assertFalse(self.p.check_recovery_conf(None))
self.p.write_recovery_conf(None)
self.assertTrue(self.p.check_recovery_conf(None))
@patch.object(Postgresql, 'start', Mock())
@patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
@patch.object(Postgresql, 'rewind', return_value=False)
def test_follow(self, mock_pg_rewind):
with patch.object(Postgresql, 'check_recovery_conf', Mock(return_value=True)):
self.assertTrue(self.p.follow(None, None)) # nothing to do, recovery.conf has good primary_conninfo
def test__get_local_timeline_lsn(self):
self.p.trigger_check_diverged_lsn()
with patch.object(Postgresql, 'controldata', Mock(return_value={'Database cluster state': 'shut down'})):
self.p.rewind_needed_and_possible(self.leader)
with patch.object(Postgresql, 'controldata',
Mock(return_value={'Database cluster state': 'shut down in recovery'})):
self.p.rewind_needed_and_possible(self.leader)
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[(False, ), Exception])):
self.p.rewind_needed_and_possible(self.leader)
self.p.follow(self.me, self.me) # follow is called when the node is holding leader lock
@patch.object(Postgresql, 'start', Mock())
@patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
@patch.object(Postgresql, '_get_local_timeline_lsn', Mock(return_value=(2, '0/40159C1')))
@patch.object(Postgresql, 'check_leader_is_not_in_recovery')
def test__check_timeline_and_lsn(self, mock_check_leader_is_not_in_recovery):
mock_check_leader_is_not_in_recovery.return_value = False
self.p.trigger_check_diverged_lsn()
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
mock_check_leader_is_not_in_recovery.return_value = True
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
self.p.trigger_check_diverged_lsn()
with patch('psycopg2.connect', Mock(side_effect=Exception)):
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
with patch.object(MockCursor, 'fetchone',
Mock(side_effect=[('', 2, '0/0'), ('', b'2\tG/40159C0\tno recovery target specified\n\n')])):
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
self.p.trigger_check_diverged_lsn()
with patch.object(MockCursor, 'fetchone',
Mock(side_effect=[('', 2, '0/0'), ('', b'3\t040159C0\tno recovery target specified\n')])):
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
self.p.trigger_check_diverged_lsn()
with patch.object(MockCursor, 'fetchone', Mock(return_value=('', 1, '0/0'))):
with patch.object(Postgresql, '_get_local_timeline_lsn', Mock(return_value=(1, '0/0'))):
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
self.p.trigger_check_diverged_lsn()
self.assertTrue(self.p.rewind_needed_and_possible(self.leader))
with patch.object(Postgresql, 'restart', Mock(return_value=False)):
self.p.set_role('replica')
self.p.follow(None, None) # restart without rewind
@patch.object(MockCursor, 'fetchone', Mock(side_effect=[(True,), Exception]))
def test_check_leader_is_not_in_recovery(self):
self.p.check_leader_is_not_in_recovery()
self.p.check_leader_is_not_in_recovery()
with patch.object(Postgresql, 'stop', Mock(return_value=False)):
self.p.follow(self.leader, self.leader, need_rewind=True) # failed to stop postgres
@patch.object(Postgresql, 'checkpoint', side_effect=['', '1'])
@patch.object(Postgresql, 'stop', Mock(return_value=False))
@patch.object(Postgresql, 'start', Mock())
def test_rewind(self, mock_checkpoint):
self.p.rewind(self.leader)
with patch.object(Postgresql, 'pg_rewind', Mock(return_value=False)):
mock_checkpoint.side_effect = ['1', '', '', '']
self.p.rewind(self.leader)
self.p.rewind(self.leader)
with patch.object(Postgresql, 'check_leader_is_not_in_recovery', Mock(return_value=False)):
self.p.rewind(self.leader)
self.p.config['remove_data_directory_on_rewind_failure'] = False
self.p.trigger_check_diverged_lsn()
self.p.rewind(self.leader)
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
self.p.rewind(self.leader)
self.p.is_leader = Mock(return_value=False)
self.p.rewind(self.leader)
self.p.follow(self.leader, self.leader) # "leader" is not accessible or is_in_recovery
with patch.object(Postgresql, 'checkpoint', Mock(return_value=None)):
self.p.follow(self.leader, self.leader)
mock_pg_rewind.return_value = True
self.p.follow(self.leader, self.leader, need_rewind=True)
self.p.follow(None, None) # check_recovery_conf...
@patch.object(Postgresql, 'is_running', Mock(return_value=False))
@patch.object(Postgresql, 'start', Mock())
def test_follow(self):
self.p.follow(None)
@patch('subprocess.check_output', Mock(return_value=0, side_effect=pg_controldata_string))
def test_can_rewind(self):
@@ -333,7 +374,7 @@ class TestPostgresql(unittest.TestCase):
self.assertFalse(self.p.can_rewind)
with patch('subprocess.call', side_effect=OSError):
self.assertFalse(self.p.can_rewind)
with patch.object(Postgresql, 'controldata', Mock(return_value={'wal_log_hints setting': 'on'})):
with patch.object(Postgresql, 'controldata', Mock(return_value={'Current wal_log_hints setting': 'on'})):
self.assertTrue(self.p.can_rewind)
self.p.config['use_pg_rewind'] = False
self.assertFalse(self.p.can_rewind)
@@ -407,7 +448,7 @@ class TestPostgresql(unittest.TestCase):
self.assertFalse(self.p.is_healthy())
def test_promote(self):
self.p._role = 'replica'
self.p.set_role('replica')
self.assertTrue(self.p.promote())
self.assertTrue(self.p.promote())
@@ -425,7 +466,9 @@ class TestPostgresql(unittest.TestCase):
self.assertFalse(self.p.is_running())
@patch('shlex.split', Mock(side_effect=OSError))
@patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
def test_call_nowait(self):
self.p.set_role('replica')
self.assertIsNone(self.p.call_nowait('on_start'))
def test_non_existing_callback(self):
@@ -502,71 +545,12 @@ class TestPostgresql(unittest.TestCase):
data = self.p.controldata()
self.assertEquals(len(data), 50)
self.assertEquals(data['Database cluster state'], 'shut down in recovery')
self.assertEquals(data['wal_log_hints setting'], 'on')
self.assertEquals(data['Current wal_log_hints setting'], 'on')
self.assertEquals(int(data['Database block size']), 8192)
with patch('subprocess.check_output', Mock(side_effect=subprocess.CalledProcessError(1, ''))):
self.assertEquals(self.p.controldata(), {})
def test_read_postmaster_opts(self):
m = mock_open(read_data=postmaster_opts_string())
with patch.object(builtins, 'open', m):
data = self.p.read_postmaster_opts()
self.assertEquals(data['wal_level'], 'hot_standby')
self.assertEquals(int(data['max_replication_slots']), 5)
self.assertEqual(data.get('D'), None)
m.side_effect = IOError
data = self.p.read_postmaster_opts()
self.assertEqual(data, dict())
@patch('subprocess.Popen')
@patch.object(builtins, 'open', MagicMock(return_value=42))
def test_single_user_mode(self, subprocess_popen_mock):
subprocess_popen_mock.return_value.wait.return_value = 0
self.assertEquals(self.p.single_user_mode(options=dict(archive_mode='on', archive_command='false')), 0)
subprocess_popen_mock.assert_called_once_with(['postgres', '--single', '-D', self.data_dir,
'-c', 'archive_command=false', '-c', 'archive_mode=on',
'postgres'], stdin=subprocess.PIPE,
stdout=42,
stderr=subprocess.STDOUT)
subprocess_popen_mock.reset_mock()
self.assertEquals(self.p.single_user_mode(command="CHECKPOINT"), 0)
subprocess_popen_mock.assert_called_once_with(['postgres', '--single', '-D', self.data_dir,
'postgres'], stdin=subprocess.PIPE,
stdout=42,
stderr=subprocess.STDOUT)
subprocess_popen_mock.return_value = None
self.assertEquals(self.p.single_user_mode(), 1)
@patch('os.listdir', MagicMock(side_effect=fake_listdir))
@patch('os.unlink', return_value=True)
@patch('os.remove', return_value=True)
@patch('os.path.islink', return_value=False)
@patch('os.path.isfile', return_value=True)
def test_cleanup_archive_status(self, mock_file, mock_link, mock_remove, mock_unlink):
ap = os.path.join(self.data_dir, 'pg_xlog', 'archive_status/')
self.p.cleanup_archive_status()
mock_remove.assert_has_calls([mock.call(ap + 'a'), mock.call(ap + 'b'), mock.call(ap + 'c')])
mock_unlink.assert_not_called()
mock_remove.reset_mock()
mock_file.return_value = False
mock_link.return_value = True
self.p.cleanup_archive_status()
mock_unlink.assert_has_calls([mock.call(ap + 'a'), mock.call(ap + 'b'), mock.call(ap + 'c')])
mock_remove.assert_not_called()
mock_unlink.reset_mock()
mock_remove.reset_mock()
mock_file.side_effect = OSError
mock_link.side_effect = OSError
self.p.cleanup_archive_status()
mock_unlink.assert_not_called()
mock_remove.assert_not_called()
@patch('patroni.postgresql.Postgresql._version_file_exists', Mock(return_value=True))
@patch('subprocess.check_output', MagicMock(return_value=0, side_effect=pg_controldata_string))
def test_sysid(self):