mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-28 02:20:04 +00:00
Smart pg_rewind (#417)
Previously we were running pg_rewind only in limited amount of cases: * when we knew postgres was a master (no recovery.conf in data dir) * when we were doing a manual switchover to a specific node (no guaranty that this node is the most up-to-date) * when a given node has nofailover tag (it could be ahead of new master) This approach was kind of working in most of the cases, but sometimes we were executing pg_rewind when it was not necessary and in some other cases we were not executing it although it was needed. The main idea of this PR is first try to figure out that we really need to run pg_rewind by analyzing timelineid, LSN and history file on master and replica and run it only if it's needed.
This commit is contained in:
committed by
GitHub
parent
fed7a24889
commit
37c1552c0a
@@ -110,5 +110,7 @@ script:
|
||||
|
||||
set +e
|
||||
after_success:
|
||||
# before_cache is executed earlier than after_success, so we need to restore one of virtualenv directories
|
||||
- fpv=$(basename $(readlink $HOME/virtualenv/python3.5)) && mv $HOME/mycache/${fpv} $HOME/virtualenv/${fpv}
|
||||
- coveralls
|
||||
- if [[ $TEST_SUITE != "behave" ]]; then python-codacy-coverage -r coverage.xml; fi
|
||||
|
||||
@@ -34,7 +34,8 @@ Feature: basic replication
|
||||
And postgres1 role is the primary after 10 seconds
|
||||
|
||||
Scenario: check rejoin of the former master with pg_rewind
|
||||
Given I start postgres0
|
||||
Given I add the table splitbrain to postgres0
|
||||
And I start postgres0
|
||||
Then postgres0 role is the secondary after 20 seconds
|
||||
When I add the table buz to postgres1
|
||||
Then table buz is present on postgres0 after 20 seconds
|
||||
|
||||
@@ -118,7 +118,10 @@ class PatroniController(AbstractController):
|
||||
super(PatroniController, self).stop(kill, timeout)
|
||||
|
||||
def _is_accessible(self):
|
||||
return self.query("SELECT 1", fail_ok=True) is not None
|
||||
cursor = self.query("SELECT 1", fail_ok=True)
|
||||
if cursor is not None:
|
||||
cursor.execute("SET synchronous_commit TO 'local'")
|
||||
return True
|
||||
|
||||
def _make_patroni_test_config(self, name, tags):
|
||||
patroni_config_name = self.PATRONI_CONFIG.format(name)
|
||||
|
||||
@@ -131,7 +131,7 @@ class Ha(object):
|
||||
logger.info('bootstrapped %s', msg)
|
||||
cluster = self.dcs.get_cluster()
|
||||
node_to_follow = self._get_node_to_follow(cluster)
|
||||
return self.state_handler.follow(node_to_follow, cluster.leader, True)
|
||||
return self.state_handler.follow(node_to_follow)
|
||||
else:
|
||||
logger.error('failed to bootstrap %s', msg)
|
||||
self.state_handler.remove_data_directory()
|
||||
@@ -171,6 +171,12 @@ class Ha(object):
|
||||
return 'trying to ' + msg
|
||||
return 'waiting for leader to bootstrap'
|
||||
|
||||
def _handle_rewind(self):
|
||||
if self.state_handler.rewind_needed_and_possible(self.cluster.leader):
|
||||
self._async_executor.schedule('running pg_rewind from ' + self.cluster.leader.name)
|
||||
self._async_executor.run_async(self.state_handler.rewind, (self.cluster.leader,))
|
||||
return True
|
||||
|
||||
def recover(self):
|
||||
if self.has_lock() and self.update_lock():
|
||||
timeout = self.patroni.config['master_start_timeout']
|
||||
@@ -184,9 +190,23 @@ class Ha(object):
|
||||
else:
|
||||
timeout = None
|
||||
|
||||
self.load_cluster_from_dcs()
|
||||
|
||||
if self.has_lock():
|
||||
msg = "starting as readonly because i had the session lock"
|
||||
node_to_follow = None
|
||||
else:
|
||||
if not self.state_handler.rewind_executed:
|
||||
self.state_handler.trigger_check_diverged_lsn()
|
||||
if self._handle_rewind():
|
||||
return self._async_executor.scheduled_action
|
||||
msg = "starting as a secondary"
|
||||
node_to_follow = self._get_node_to_follow(self.cluster)
|
||||
|
||||
self.recovering = True
|
||||
return self.follow("starting as readonly because i had the session lock",
|
||||
"starting as a secondary", True, True, None, timeout)
|
||||
self._async_executor.schedule('restarting after failure')
|
||||
self._async_executor.run_async(self.state_handler.follow, (node_to_follow, timeout))
|
||||
return msg
|
||||
|
||||
def _get_node_to_follow(self, cluster):
|
||||
# determine the node to follow. If replicatefrom tag is set,
|
||||
@@ -198,29 +218,33 @@ class Ha(object):
|
||||
|
||||
return node_to_follow if node_to_follow and node_to_follow.name != self.state_handler.name else None
|
||||
|
||||
def follow(self, demote_reason, follow_reason, refresh=True, recovery=False, need_rewind=None, timeout=None):
|
||||
def follow(self, demote_reason, follow_reason, refresh=True):
|
||||
if refresh:
|
||||
self.load_cluster_from_dcs()
|
||||
|
||||
if recovery:
|
||||
ret = demote_reason if self.has_lock() else follow_reason
|
||||
else:
|
||||
is_leader = self.state_handler.is_leader()
|
||||
ret = demote_reason if is_leader else follow_reason
|
||||
is_leader = self.state_handler.is_leader()
|
||||
|
||||
node_to_follow = self._get_node_to_follow(self.cluster)
|
||||
|
||||
if self.is_paused() and not (self.state_handler.need_rewind and self.state_handler.can_rewind):
|
||||
self.state_handler.set_role('master' if is_leader else 'replica')
|
||||
if is_leader:
|
||||
return 'continue to run as master without lock'
|
||||
elif not node_to_follow:
|
||||
return 'no action'
|
||||
if self.is_paused():
|
||||
if not (self.state_handler.need_rewind and self.state_handler.can_rewind) or self.cluster.is_unlocked():
|
||||
self.state_handler.set_role('master' if is_leader else 'replica')
|
||||
if is_leader:
|
||||
return 'continue to run as master without lock'
|
||||
elif not node_to_follow:
|
||||
return 'no action'
|
||||
elif is_leader:
|
||||
self.demote('immediate')
|
||||
return demote_reason
|
||||
|
||||
self.state_handler.follow(node_to_follow, self.cluster.leader, recovery,
|
||||
self._async_executor, need_rewind, timeout)
|
||||
if self._handle_rewind():
|
||||
return self._async_executor.scheduled_action
|
||||
|
||||
return ret
|
||||
if not self.state_handler.check_recovery_conf(node_to_follow):
|
||||
self._async_executor.schedule('changing primary_conninfo and restarting')
|
||||
self._async_executor.run_async(self.state_handler.follow, (node_to_follow,))
|
||||
|
||||
return follow_reason
|
||||
|
||||
def is_synchronous_mode(self):
|
||||
return bool(self.cluster and self.cluster.config and self.cluster.config.data.get('synchronous_mode'))
|
||||
@@ -497,6 +521,7 @@ class Ha(object):
|
||||
without regard for data durability. May only be called synchronously.
|
||||
"""
|
||||
assert mode in ['offline', 'graceful', 'immediate']
|
||||
self.state_handler.trigger_check_diverged_lsn()
|
||||
if mode != 'offline':
|
||||
if mode == 'immediate':
|
||||
self.state_handler.stop('immediate', checkpoint=False)
|
||||
@@ -510,16 +535,18 @@ class Ha(object):
|
||||
if mode == 'immediate':
|
||||
# We will try to start up as a standby now. If no one takes the leader lock before we finish
|
||||
# recovery we will try to promote ourselves.
|
||||
self._async_executor.schedule('waiting for failover to complete')
|
||||
self._async_executor.run_async(self.state_handler.follow,
|
||||
(node_to_follow, cluster.leader, True, None, True))
|
||||
self._async_executor.schedule('starting after demotion')
|
||||
self._async_executor.run_async(self.state_handler.follow, (node_to_follow,))
|
||||
else:
|
||||
return self.state_handler.follow(node_to_follow, cluster.leader, recovery=True, need_rewind=True)
|
||||
logger.info('cluster.leader = %s', cluster.leader)
|
||||
if self.state_handler.rewind_needed_and_possible(cluster.leader):
|
||||
return False # do not start postgres, but run pg_rewind on the next iteration
|
||||
return self.state_handler.follow(node_to_follow)
|
||||
else:
|
||||
# Need to become unavailable as soon as possible, so initiate a stop here. However as we can't release
|
||||
# the leader key we don't care about confirming the shutdown quickly and can use a regular stop.
|
||||
self.state_handler.stop(checkpoint=False)
|
||||
self.state_handler.follow(None, None, recovery=True)
|
||||
self.state_handler.follow(None)
|
||||
|
||||
def should_run_scheduled_action(self, action_name, scheduled_at, cleanup_fn):
|
||||
if scheduled_at and not self.is_paused():
|
||||
@@ -614,17 +641,16 @@ class Ha(object):
|
||||
else:
|
||||
# when we are doing manual failover there is no guaranty that new leader is ahead of any other node
|
||||
# node tagged as nofailover can be ahead of the new leader either, but it is always excluded from elections
|
||||
need_rewind = bool(self.cluster.failover) or self.patroni.nofailover
|
||||
if need_rewind:
|
||||
check_diverged_lsn = bool(self.cluster.failover) or self.patroni.nofailover
|
||||
if check_diverged_lsn:
|
||||
self.state_handler.trigger_check_diverged_lsn()
|
||||
time.sleep(2) # Give a time to somebody to take the leader lock
|
||||
|
||||
if self.patroni.nofailover:
|
||||
return self.follow('demoting self because I am not allowed to become master',
|
||||
'following a different leader because I am not allowed to promote',
|
||||
need_rewind=need_rewind)
|
||||
'following a different leader because I am not allowed to promote')
|
||||
return self.follow('demoting self because i am not the healthiest node',
|
||||
'following a different leader because i am not the healthiest node',
|
||||
need_rewind=need_rewind)
|
||||
'following a different leader because i am not the healthiest node')
|
||||
|
||||
def process_healthy_cluster(self):
|
||||
if self.has_lock():
|
||||
@@ -936,6 +962,8 @@ class Ha(object):
|
||||
# asynchronous processes are running (should be always the case for the master)
|
||||
if not self._async_executor.busy and not self.state_handler.is_starting():
|
||||
if not self.state_handler.cb_called:
|
||||
if not self.state_handler.is_leader():
|
||||
self.state_handler.trigger_check_diverged_lsn()
|
||||
self.state_handler.call_nowait(ACTION_ON_START)
|
||||
self.state_handler.sync_replication_slots(self.cluster)
|
||||
except DCSError:
|
||||
|
||||
@@ -9,6 +9,7 @@ import tempfile
|
||||
import time
|
||||
|
||||
from collections import defaultdict
|
||||
from contextlib import contextmanager
|
||||
from patroni import call_self
|
||||
from patroni.callback_executor import CallbackExecutor
|
||||
from patroni.exceptions import PostgresConnectionException, PostgresException
|
||||
@@ -29,6 +30,8 @@ STATE_REJECT = 'rejecting connections'
|
||||
STATE_NO_RESPONSE = 'not responding'
|
||||
STATE_UNKNOWN = 'unknown'
|
||||
|
||||
REWIND_STATUS = type('Enum', (), {'INITIAL': 0, 'CHECK': 1, 'NEED': 2, 'NOT_NEED': 3, 'SUCCESS': 4, 'FAILED': 5})
|
||||
|
||||
|
||||
def slot_name_from_member_name(member_name):
|
||||
"""Translate member name to valid PostgreSQL slot name.
|
||||
@@ -99,7 +102,7 @@ class Postgresql(object):
|
||||
self._replication = config['authentication']['replication']
|
||||
self.resolve_connection_addresses()
|
||||
|
||||
self._need_rewind = False
|
||||
self._rewind_state = REWIND_STATUS.INITIAL
|
||||
self._use_slots = config.get('use_slots', True)
|
||||
self._schedule_load_slots = self.use_slots
|
||||
|
||||
@@ -296,6 +299,11 @@ class Postgresql(object):
|
||||
def pending_restart(self):
|
||||
return self._pending_restart
|
||||
|
||||
@staticmethod
|
||||
def configuration_allows_rewind(data):
|
||||
return data.get('Current wal_log_hints setting', 'off') == 'on' \
|
||||
or data.get('Data page checksum version', '0') != '0'
|
||||
|
||||
@property
|
||||
def can_rewind(self):
|
||||
""" check if pg_rewind executable is there and that pg_controldata indicates
|
||||
@@ -312,9 +320,7 @@ class Postgresql(object):
|
||||
return False
|
||||
except OSError:
|
||||
return False
|
||||
# check if the cluster's configuration permits pg_rewind
|
||||
data = self.controldata()
|
||||
return data.get('wal_log_hints setting', 'off') == 'on' or data.get('Data page checksum version', '0') != '0'
|
||||
return self.configuration_allows_rewind(self.controldata())
|
||||
|
||||
@property
|
||||
def sysid(self):
|
||||
@@ -751,15 +757,13 @@ class Postgresql(object):
|
||||
for p in ['connect_timeout', 'options']:
|
||||
connect_kwargs.pop(p, None)
|
||||
try:
|
||||
with psycopg2.connect(**connect_kwargs) as conn:
|
||||
conn.autocommit = True
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SET statement_timeout = 0")
|
||||
if check_not_is_in_recovery:
|
||||
cur.execute('SELECT pg_is_in_recovery()')
|
||||
if cur.fetchone()[0]:
|
||||
return 'is_in_recovery=true'
|
||||
return cur.execute('CHECKPOINT')
|
||||
with self._get_connection_cursor(**connect_kwargs) as cur:
|
||||
cur.execute("SET statement_timeout = 0")
|
||||
if check_not_is_in_recovery:
|
||||
cur.execute('SELECT pg_is_in_recovery()')
|
||||
if cur.fetchone()[0]:
|
||||
return 'is_in_recovery=true'
|
||||
return cur.execute('CHECKPOINT')
|
||||
except psycopg2.Error:
|
||||
logging.exception('Exception during CHECKPOINT')
|
||||
return 'not accessible or not healty'
|
||||
@@ -882,14 +886,17 @@ class Postgresql(object):
|
||||
f.write('\n{}\n'.format('\n'.join(config)))
|
||||
|
||||
def primary_conninfo(self, member):
|
||||
if not (member and member.conn_url):
|
||||
if not (member and member.conn_url) or member.name == self.name:
|
||||
return None
|
||||
r = member.conn_kwargs(self._replication)
|
||||
r.update({'application_name': self.name, 'sslmode': 'prefer', 'sslcompression': '1'})
|
||||
keywords = 'user password host port sslmode sslcompression application_name'.split()
|
||||
return ' '.join('{0}={{{0}}}'.format(kw) for kw in keywords).format(**r)
|
||||
|
||||
def check_recovery_conf(self, primary_conninfo):
|
||||
def check_recovery_conf(self, member):
|
||||
# TODO: recovery.conf could be stale, would be nice to detect that.
|
||||
primary_conninfo = self.primary_conninfo(member)
|
||||
|
||||
if not os.path.isfile(self._recovery_conf):
|
||||
return False
|
||||
|
||||
@@ -910,7 +917,7 @@ class Postgresql(object):
|
||||
if name not in ('standby_mode', 'recovery_target_timeline', 'primary_conninfo', 'primary_slot_name'):
|
||||
f.write("{0} = '{1}'\n".format(name, value))
|
||||
|
||||
def rewind(self, r):
|
||||
def pg_rewind(self, r):
|
||||
# prepare pg_rewind connection
|
||||
env = self.write_pgpass(r)
|
||||
dsn_attrs = [
|
||||
@@ -937,139 +944,167 @@ class Postgresql(object):
|
||||
# Don't try to call pg_controldata during backup restore
|
||||
if self._version_file_exists() and self.state != 'creating replica':
|
||||
try:
|
||||
data = subprocess.check_output([self._pgcommand('pg_controldata'), self._data_dir])
|
||||
data = subprocess.check_output([self._pgcommand('pg_controldata'), self._data_dir],
|
||||
env={'LANG': 'C', 'LC_ALL': 'C', 'PATH': os.environ['PATH']})
|
||||
if data:
|
||||
data = data.decode('utf-8').splitlines()
|
||||
result = {l.split(':')[0].replace('Current ', '', 1): l.split(':')[1].strip() for l in data if l}
|
||||
result = {l.split(':', 1)[0]: l.split(':', 1)[1].strip() for l in data if l}
|
||||
except subprocess.CalledProcessError:
|
||||
logger.exception("Error when calling pg_controldata")
|
||||
return result
|
||||
|
||||
def read_postmaster_opts(self):
|
||||
""" returns the list of option names/values from postgres.opts, Empty dict if read failed or no file """
|
||||
result = {}
|
||||
try:
|
||||
with open(os.path.join(self._data_dir, "postmaster.opts")) as f:
|
||||
data = f.read()
|
||||
opts = [opt.strip('"\n') for opt in data.split(' "')]
|
||||
for opt in opts:
|
||||
if '=' in opt and opt.startswith('--'):
|
||||
name, val = opt.split('=', 1)
|
||||
name = name.strip('-')
|
||||
result[name] = val
|
||||
except IOError:
|
||||
logger.exception('Error when reading postmaster.opts')
|
||||
return result
|
||||
|
||||
def single_user_mode(self, command=None, options=None):
|
||||
""" run a given command in a single-user mode. If the command is empty - then just start and stop """
|
||||
cmd = [self._pgcommand('postgres'), '--single', '-D', self._data_dir]
|
||||
for opt, val in sorted((options or {}).items()):
|
||||
cmd.extend(['-c', '{0}={1}'.format(opt, val)])
|
||||
# need a database name to connect
|
||||
cmd.append(self._database)
|
||||
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT)
|
||||
if p:
|
||||
if command:
|
||||
p.communicate('{0}\n'.format(command))
|
||||
p.stdin.close()
|
||||
return p.wait()
|
||||
return 1
|
||||
|
||||
def cleanup_archive_status(self):
|
||||
status_dir = os.path.join(self._data_dir, 'pg_xlog', 'archive_status')
|
||||
try:
|
||||
for f in os.listdir(status_dir):
|
||||
path = os.path.join(status_dir, f)
|
||||
try:
|
||||
if os.path.islink(path):
|
||||
os.unlink(path)
|
||||
elif os.path.isfile(path):
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
logger.exception("Unable to remove %s", path)
|
||||
except OSError:
|
||||
logger.exception("Unable to list %s", status_dir)
|
||||
|
||||
@property
|
||||
def need_rewind(self):
|
||||
return self._need_rewind
|
||||
return self._rewind_state in (REWIND_STATUS.CHECK, REWIND_STATUS.NEED)
|
||||
|
||||
def follow(self, member, leader, recovery=False, async_executor=None, need_rewind=None, timeout=None):
|
||||
if need_rewind is not None:
|
||||
self._need_rewind = need_rewind
|
||||
@staticmethod
|
||||
@contextmanager
|
||||
def _get_connection_cursor(**kwargs):
|
||||
with psycopg2.connect(**kwargs) as conn:
|
||||
conn.autocommit = True
|
||||
with conn.cursor() as cur:
|
||||
yield cur
|
||||
|
||||
primary_conninfo = self.primary_conninfo(member)
|
||||
@contextmanager
|
||||
def _get_replication_connection_cursor(self, host='localhost', port=5432, **kwargs):
|
||||
with self._get_connection_cursor(host=host, port=int(port), database=self._database, replication=1,
|
||||
user=self._replication['username'], password=self._replication['password'],
|
||||
connect_timeout=3, options='-c statement_timeout=2000') as cur:
|
||||
yield cur
|
||||
|
||||
if self.check_recovery_conf(primary_conninfo) and not recovery:
|
||||
return True
|
||||
def check_leader_is_not_in_recovery(self, **kwargs):
|
||||
try:
|
||||
with self._get_connection_cursor(connect_timeout=3, options='-c statement_timeout=2000', **kwargs) as cur:
|
||||
cur.execute('SELECT pg_is_in_recovery()')
|
||||
if not cur.fetchone()[0]:
|
||||
return True
|
||||
logger.info('Leader is still in_recovery and therefore can\'t be used for rewind')
|
||||
except Exception:
|
||||
return logger.exception('Exception when working with leader')
|
||||
|
||||
if async_executor:
|
||||
async_executor.schedule('changing primary_conninfo and restarting')
|
||||
async_executor.run_async(self._do_follow, (primary_conninfo, leader, recovery, timeout))
|
||||
def _get_local_timeline_lsn(self):
|
||||
timeline = lsn = None
|
||||
if self.is_running(): # if postgres is running - get timeline and lsn from replication connection
|
||||
try:
|
||||
with self._get_replication_connection_cursor(**self._local_address) as cur:
|
||||
cur.execute('IDENTIFY_SYSTEM')
|
||||
timeline, lsn = cur.fetchone()[1:3]
|
||||
except Exception:
|
||||
logger.exception('Can not fetch local timeline and lsn from replication connection')
|
||||
else: # otherwise analyze pg_controldata output
|
||||
data = self.controldata()
|
||||
try:
|
||||
if data.get('Database cluster state') == 'shut down':
|
||||
lsn = data.get('Latest checkpoint location')
|
||||
timeline = int(data.get("Latest checkpoint's TimeLineID"))
|
||||
elif data.get('Database cluster state') == 'shut down in recovery':
|
||||
lsn = data.get('Minimum recovery ending location')
|
||||
timeline = int(data.get("Min recovery ending loc's timeline"))
|
||||
except (TypeError, ValueError):
|
||||
logger.exception('Failed to get local timeline and lsn from pg_controldata output')
|
||||
logger.info('Local timeline=%s lsn=%s', timeline, lsn)
|
||||
return timeline, lsn
|
||||
|
||||
def _check_timeline_and_lsn(self, leader):
|
||||
local_timeline, local_lsn = self._get_local_timeline_lsn()
|
||||
if local_timeline is None or local_lsn is None:
|
||||
return
|
||||
|
||||
if not self.check_leader_is_not_in_recovery(**leader.conn_kwargs(self._superuser)):
|
||||
return
|
||||
|
||||
history = need_rewind = None
|
||||
try:
|
||||
with self._get_replication_connection_cursor(**leader.conn_kwargs()) as cur:
|
||||
cur.execute('IDENTIFY_SYSTEM')
|
||||
master_timeline = cur.fetchone()[1]
|
||||
logger.info('master_timeline=%s', master_timeline)
|
||||
if local_timeline > master_timeline: # Not always supported by pg_rewind
|
||||
need_rewind = True
|
||||
elif master_timeline > 1:
|
||||
cur.execute('TIMELINE_HISTORY %s', (master_timeline,))
|
||||
history = bytes(cur.fetchone()[1]).decode('utf-8')
|
||||
logger.info('master: history=%s', history)
|
||||
else: # local_timeline == master_timeline == 1
|
||||
need_rewind = False
|
||||
except Exception:
|
||||
return logger.exception('Exception when working with master via replication connection')
|
||||
|
||||
if history is not None:
|
||||
def parse_lsn(lsn):
|
||||
t = lsn.split('/')
|
||||
return int(t[0], 16) * 0x100000000 + int(t[1], 16)
|
||||
|
||||
for line in history.split('\n'):
|
||||
line = line.strip().split('\t')
|
||||
if len(line) == 3:
|
||||
try:
|
||||
timeline = int(line[0])
|
||||
if timeline == local_timeline:
|
||||
try:
|
||||
need_rewind = parse_lsn(local_lsn) >= parse_lsn(line[1])
|
||||
except ValueError:
|
||||
logger.exception('Exception when parsing lsn')
|
||||
break
|
||||
elif timeline > local_timeline:
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
self._rewind_state = need_rewind and REWIND_STATUS.NEED or REWIND_STATUS.NOT_NEED
|
||||
|
||||
def rewind(self, leader):
|
||||
if self.is_running() and not self.stop(checkpoint=False):
|
||||
return logger.warning('Can not run pg_rewind because postgres is still running')
|
||||
|
||||
# prepare pg_rewind connection
|
||||
r = leader.conn_kwargs(self._superuser)
|
||||
|
||||
# first make sure that we are really trying to rewind
|
||||
# from the master and run a checkpoint on it in order to
|
||||
# make it store the new timeline (5540277D.8020309@iki.fi)
|
||||
leader_status = self.checkpoint(r)
|
||||
if leader_status:
|
||||
return logger.warning('Can not use %s for rewind: %s', leader.name, leader_status)
|
||||
|
||||
if self.pg_rewind(r):
|
||||
self._rewind_state = REWIND_STATUS.SUCCESS
|
||||
elif not self.check_leader_is_not_in_recovery(**r):
|
||||
logger.warning('Failed to rewind because master %s become unreachable', leader.name)
|
||||
else:
|
||||
return self._do_follow(primary_conninfo, leader, recovery, timeout)
|
||||
logger.error('Failed to rewind from healty master: %s', leader.name)
|
||||
|
||||
def _do_follow(self, primary_conninfo, leader, recovery=False, timeout=None):
|
||||
if self.config.get('remove_data_directory_on_rewind_failure', False):
|
||||
logger.warning('remove_data_directory_on_rewind_failure is set. removing...')
|
||||
self.remove_data_directory()
|
||||
self._rewind_state = REWIND_STATUS.INITIAL
|
||||
else:
|
||||
self._rewind_state = REWIND_STATUS.FAILED
|
||||
return False
|
||||
|
||||
def trigger_check_diverged_lsn(self):
|
||||
if self.can_rewind and self._rewind_state != REWIND_STATUS.NEED:
|
||||
self._rewind_state = REWIND_STATUS.CHECK
|
||||
|
||||
def rewind_needed_and_possible(self, leader):
|
||||
if leader and leader.name != self.name and leader.conn_url and self._rewind_state == REWIND_STATUS.CHECK:
|
||||
self._check_timeline_and_lsn(leader)
|
||||
return leader and leader.conn_url and self._rewind_state == REWIND_STATUS.NEED
|
||||
|
||||
@property
|
||||
def rewind_executed(self):
|
||||
return self._rewind_state > REWIND_STATUS.NOT_NEED
|
||||
|
||||
def follow(self, member, timeout=None):
|
||||
primary_conninfo = self.primary_conninfo(member)
|
||||
change_role = self.role in ('master', 'demoted')
|
||||
|
||||
if leader and leader.name == self.name:
|
||||
primary_conninfo = None
|
||||
self._need_rewind = False
|
||||
if self.is_running():
|
||||
return
|
||||
elif change_role:
|
||||
self._need_rewind = True
|
||||
|
||||
if self._need_rewind and not self.can_rewind:
|
||||
logger.warning("Data directory may be out of sync master, rewind may be needed.")
|
||||
|
||||
if self._need_rewind and leader and leader.conn_url and self.can_rewind:
|
||||
logger.info("rewind flag is set")
|
||||
|
||||
if self.is_running() and not self.stop(checkpoint=False):
|
||||
return logger.warning('Can not run pg_rewind because postgres is still running')
|
||||
|
||||
# prepare pg_rewind connection
|
||||
r = leader.conn_kwargs(self._superuser)
|
||||
|
||||
# first make sure that we are really trying to rewind
|
||||
# from the master and run a checkpoint on a t in order to
|
||||
# make it store the new timeline (5540277D.8020309@iki.fi)
|
||||
leader_status = self.checkpoint(r)
|
||||
if leader_status:
|
||||
return logger.warning('Can not use %s for rewind: %s', leader.name, leader_status)
|
||||
|
||||
# at present, pg_rewind only runs when the cluster is shut down cleanly
|
||||
# and not shutdown in recovery. We have to remove the recovery.conf if present
|
||||
# and start/shutdown in a single user mode to emulate this.
|
||||
# XXX: if recovery.conf is linked, it will be written anew as a normal file.
|
||||
if os.path.isfile(self._recovery_conf) or os.path.islink(self._recovery_conf):
|
||||
os.unlink(self._recovery_conf)
|
||||
|
||||
# Archived segments might be useful to pg_rewind,
|
||||
# clean the flags that tell we should remove them.
|
||||
self.cleanup_archive_status()
|
||||
|
||||
# Start in a single user mode and stop to produce a clean shutdown
|
||||
opts = self.read_postmaster_opts()
|
||||
opts.update({'archive_mode': 'on', 'archive_command': 'false'})
|
||||
self.single_user_mode(options=opts)
|
||||
|
||||
if self.rewind(r) or not self.config.get('remove_data_directory_on_rewind_failure', False):
|
||||
self.write_recovery_conf(primary_conninfo)
|
||||
self.start()
|
||||
else:
|
||||
logger.error('unable to rewind the former master')
|
||||
self.remove_data_directory()
|
||||
self._need_rewind = False
|
||||
self.write_recovery_conf(primary_conninfo)
|
||||
if self.is_running():
|
||||
self.restart()
|
||||
else:
|
||||
self.write_recovery_conf(primary_conninfo)
|
||||
if recovery:
|
||||
self.start(timeout=timeout)
|
||||
else:
|
||||
self.restart()
|
||||
self.set_role('replica')
|
||||
self.start(timeout=timeout)
|
||||
self.set_role('replica')
|
||||
|
||||
if change_role:
|
||||
# TODO: postpone this until start completes, or maybe do even earlier
|
||||
@@ -1104,8 +1139,8 @@ class Postgresql(object):
|
||||
ret = self.pg_ctl('promote')
|
||||
if ret:
|
||||
self.set_role('master')
|
||||
logger.info("cleared rewind flag after becoming the leader")
|
||||
self._need_rewind = False
|
||||
logger.info("cleared rewind state after becoming the leader")
|
||||
self._rewind_state = REWIND_STATUS.INITIAL
|
||||
self.call_nowait(ACTION_ON_ROLE_CHANGE)
|
||||
return ret
|
||||
|
||||
|
||||
@@ -126,6 +126,7 @@ def run_async(self, func, args=()):
|
||||
@patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=False))
|
||||
@patch('patroni.async_executor.AsyncExecutor.run_async', run_async)
|
||||
@patch('subprocess.call', Mock(return_value=0))
|
||||
@patch('time.sleep', Mock())
|
||||
class TestHa(unittest.TestCase):
|
||||
|
||||
@patch('socket.getaddrinfo', socket_getaddrinfo)
|
||||
@@ -167,7 +168,6 @@ class TestHa(unittest.TestCase):
|
||||
|
||||
def test_recover_replica_failed(self):
|
||||
self.p.controldata = lambda: {'Database cluster state': 'in production'}
|
||||
self.p.is_healthy = false
|
||||
self.p.is_running = false
|
||||
self.p.follow = false
|
||||
self.assertEquals(self.ha.run_cycle(), 'starting as a secondary')
|
||||
@@ -175,7 +175,6 @@ class TestHa(unittest.TestCase):
|
||||
|
||||
def test_recover_master_failed(self):
|
||||
self.p.follow = false
|
||||
self.p.is_healthy = false
|
||||
self.p.is_running = false
|
||||
self.p.name = 'leader'
|
||||
self.p.set_role('master')
|
||||
@@ -183,6 +182,12 @@ class TestHa(unittest.TestCase):
|
||||
self.ha.cluster = get_cluster_initialized_with_leader()
|
||||
self.assertEquals(self.ha.run_cycle(), 'starting as readonly because i had the session lock')
|
||||
|
||||
@patch.object(Postgresql, 'rewind_needed_and_possible', Mock(return_value=True))
|
||||
def test_recover_with_rewind(self):
|
||||
self.p.is_running = false
|
||||
self.ha.cluster = get_cluster_initialized_with_leader()
|
||||
self.assertEquals(self.ha.run_cycle(), 'running pg_rewind from leader')
|
||||
|
||||
@patch('sys.exit', return_value=1)
|
||||
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
|
||||
def test_sysid_no_match(self, exit_mock):
|
||||
@@ -260,6 +265,13 @@ class TestHa(unittest.TestCase):
|
||||
self.p.is_leader = false
|
||||
self.assertEquals(self.ha.run_cycle(), 'PAUSE: no action')
|
||||
|
||||
@patch.object(Postgresql, 'rewind_needed_and_possible', Mock(return_value=True))
|
||||
def test_follow_triggers_rewind(self):
|
||||
self.p.is_leader = false
|
||||
self.p.trigger_check_diverged_lsn()
|
||||
self.ha.cluster = get_cluster_initialized_with_leader()
|
||||
self.assertEquals(self.ha.run_cycle(), 'running pg_rewind from leader')
|
||||
|
||||
def test_no_etcd_connection_master_demote(self):
|
||||
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
||||
self.assertEquals(self.ha.run_cycle(), 'demoted self because DCS is not accessible and i was a leader')
|
||||
@@ -331,7 +343,6 @@ class TestHa(unittest.TestCase):
|
||||
self.assertEquals(self.ha.run_cycle(), 'failed to update leader lock during restart')
|
||||
|
||||
@patch('requests.get', requests_get)
|
||||
@patch('time.sleep', Mock())
|
||||
def test_manual_failover_from_leader(self):
|
||||
self.ha.fetch_node_status = get_node_status()
|
||||
self.ha.has_lock = true
|
||||
@@ -344,6 +355,8 @@ class TestHa(unittest.TestCase):
|
||||
f = Failover(0, self.p.name, '', None)
|
||||
self.ha.cluster = get_cluster_initialized_with_leader(f)
|
||||
self.assertEquals(self.ha.run_cycle(), 'manual failover: demoting myself')
|
||||
self.p.rewind_needed_and_possible = true
|
||||
self.assertEquals(self.ha.run_cycle(), 'manual failover: demoting myself')
|
||||
self.ha.fetch_node_status = get_node_status(nofailover=True)
|
||||
self.assertEquals(self.ha.run_cycle(), 'no action. i am the leader with the lock')
|
||||
self.ha.fetch_node_status = get_node_status(xlog_location=1)
|
||||
@@ -384,7 +397,6 @@ class TestHa(unittest.TestCase):
|
||||
self.assertEquals('PAUSE: no action. i am the leader with the lock', self.ha.run_cycle())
|
||||
|
||||
@patch('requests.get', requests_get)
|
||||
@patch('time.sleep', Mock())
|
||||
def test_manual_failover_process_no_leader(self):
|
||||
self.p.is_leader = false
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', self.p.name, None))
|
||||
@@ -409,7 +421,6 @@ class TestHa(unittest.TestCase):
|
||||
self.ha.patroni.nofailover = True
|
||||
self.assertEquals(self.ha.run_cycle(), 'following a different leader because I am not allowed to promote')
|
||||
|
||||
@patch('time.sleep', Mock())
|
||||
def test_manual_failover_process_no_leader_in_pause(self):
|
||||
self.ha.is_paused = true
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(failover=Failover(0, '', 'other', None))
|
||||
@@ -569,7 +580,6 @@ class TestHa(unittest.TestCase):
|
||||
self.assertEquals(self.ha.run_cycle(), 'no action. i am a secondary and i am following a leader')
|
||||
check_calls([(update_lock, False), (demote, False)])
|
||||
|
||||
@patch('time.sleep', Mock())
|
||||
def test_manual_failover_while_starting(self):
|
||||
self.ha.has_lock = true
|
||||
self.p.check_for_startup = true
|
||||
@@ -589,15 +599,13 @@ class TestHa(unittest.TestCase):
|
||||
self.assertEquals(self.ha.run_cycle(), 'stopped PostgreSQL to fail over after a crash')
|
||||
demote.assert_called_once()
|
||||
|
||||
@patch('time.sleep', Mock())
|
||||
@patch('patroni.postgresql.Postgresql.follow')
|
||||
def test_demote_immediate(self, follow):
|
||||
self.ha.has_lock = true
|
||||
self.e.get_cluster = Mock(return_value=get_cluster_initialized_without_leader())
|
||||
self.ha.demote('immediate')
|
||||
follow.assert_called_once_with(None, None, True, None, True)
|
||||
follow.assert_called_once_with(None)
|
||||
|
||||
@patch('time.sleep', Mock())
|
||||
def test_process_sync_replication(self):
|
||||
self.ha.has_lock = true
|
||||
mock_set_sync = self.p.set_synchronous_standby = Mock()
|
||||
@@ -724,8 +732,7 @@ class TestHa(unittest.TestCase):
|
||||
mock_promote.assert_called_once()
|
||||
mock_write_sync.assert_called_once_with('other', None, index=0)
|
||||
|
||||
@patch('time.sleep')
|
||||
def test_disable_sync_when_restarting(self, mock_sleep):
|
||||
def test_disable_sync_when_restarting(self):
|
||||
self.ha.is_synchronous_mode = true
|
||||
|
||||
self.p.name = 'other'
|
||||
@@ -738,10 +745,10 @@ class TestHa(unittest.TestCase):
|
||||
get_cluster_initialized_with_leader(sync=('leader', syncstandby))
|
||||
for syncstandby in ['other', None]])
|
||||
|
||||
self.ha.restart({})
|
||||
|
||||
mock_restart.assert_called_once()
|
||||
mock_sleep.assert_called()
|
||||
with patch('time.sleep') as mock_sleep:
|
||||
self.ha.restart({})
|
||||
mock_restart.assert_called_once()
|
||||
mock_sleep.assert_called()
|
||||
|
||||
# Restart is still called when DCS connection fails
|
||||
mock_restart.reset_mock()
|
||||
|
||||
@@ -43,6 +43,13 @@ class MockCursor(object):
|
||||
('port', '5433', None, 'integer', 'postmaster'),
|
||||
('listen_addresses', '*', None, 'string', 'postmaster'),
|
||||
('autovacuum', 'on', None, 'bool', 'sighup')]
|
||||
elif sql.startswith('IDENTIFY_SYSTEM'):
|
||||
self.results = [('1', 2, '0/402EEC0', '')]
|
||||
elif sql.startswith('TIMELINE_HISTORY '):
|
||||
self.results = [('', b'x\t0/40159C0\tno recovery target specified\n\n' +
|
||||
b'1\t0/40159C0\tno recovery target specified\n\n' +
|
||||
b'2\t0/402DD98\tno recovery target specified\n\n' +
|
||||
b'3\t0/403DD98\tno recovery target specified\n')]
|
||||
else:
|
||||
self.results = [(None, None, None, None, None, None, None, None, None, None)]
|
||||
|
||||
@@ -138,21 +145,10 @@ Data page checksum version: 0
|
||||
"""
|
||||
|
||||
|
||||
def postmaster_opts_string(*args, **kwargs):
|
||||
return '/usr/local/pgsql/bin/postgres "-D" "data/postgresql0" "--listen_addresses=127.0.0.1" \
|
||||
"--port=5432" "--hot_standby=on" "--wal_keep_segments=8" "--wal_level=hot_standby" \
|
||||
"--archive_command=mkdir -p ../wal_archive && cp %p ../wal_archive/%f" "--wal_log_hints=on" \
|
||||
"--max_wal_senders=5" "--archive_timeout=1800s" "--archive_mode=on" "--max_replication_slots=5"\n'
|
||||
|
||||
|
||||
def psycopg2_connect(*args, **kwargs):
|
||||
return MockConnect()
|
||||
|
||||
|
||||
def fake_listdir(path):
|
||||
return ["a", "b", "c"] if path.endswith('pg_xlog/archive_status') else []
|
||||
|
||||
|
||||
@patch('subprocess.call', Mock(return_value=0))
|
||||
@patch('psycopg2.connect', psycopg2_connect)
|
||||
class TestPostgresql(unittest.TestCase):
|
||||
@@ -293,39 +289,84 @@ class TestPostgresql(unittest.TestCase):
|
||||
@patch('patroni.postgresql.Postgresql.write_pgpass', MagicMock(return_value=dict()))
|
||||
def test_pg_rewind(self, mock_call):
|
||||
r = {'user': '', 'host': '', 'port': '', 'database': '', 'password': ''}
|
||||
self.assertTrue(self.p.rewind(r))
|
||||
self.assertTrue(self.p.pg_rewind(r))
|
||||
subprocess.call = mock_call
|
||||
self.assertFalse(self.p.rewind(r))
|
||||
self.assertFalse(self.p.pg_rewind(r))
|
||||
|
||||
@patch('os.unlink', Mock(return_value=True))
|
||||
@patch('subprocess.check_output', Mock(return_value=0, side_effect=pg_controldata_string))
|
||||
@patch.object(Postgresql, 'remove_data_directory', Mock(return_value=True))
|
||||
@patch.object(Postgresql, 'single_user_mode', Mock(return_value=1))
|
||||
@patch.object(Postgresql, 'write_pgpass', Mock(return_value={}))
|
||||
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
|
||||
def test_check_recovery_conf(self):
|
||||
self.p.write_recovery_conf('foo')
|
||||
self.assertFalse(self.p.check_recovery_conf(None))
|
||||
self.p.write_recovery_conf(None)
|
||||
self.assertTrue(self.p.check_recovery_conf(None))
|
||||
|
||||
@patch.object(Postgresql, 'start', Mock())
|
||||
@patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
|
||||
@patch.object(Postgresql, 'rewind', return_value=False)
|
||||
def test_follow(self, mock_pg_rewind):
|
||||
with patch.object(Postgresql, 'check_recovery_conf', Mock(return_value=True)):
|
||||
self.assertTrue(self.p.follow(None, None)) # nothing to do, recovery.conf has good primary_conninfo
|
||||
def test__get_local_timeline_lsn(self):
|
||||
self.p.trigger_check_diverged_lsn()
|
||||
with patch.object(Postgresql, 'controldata', Mock(return_value={'Database cluster state': 'shut down'})):
|
||||
self.p.rewind_needed_and_possible(self.leader)
|
||||
with patch.object(Postgresql, 'controldata',
|
||||
Mock(return_value={'Database cluster state': 'shut down in recovery'})):
|
||||
self.p.rewind_needed_and_possible(self.leader)
|
||||
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
|
||||
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[(False, ), Exception])):
|
||||
self.p.rewind_needed_and_possible(self.leader)
|
||||
|
||||
self.p.follow(self.me, self.me) # follow is called when the node is holding leader lock
|
||||
@patch.object(Postgresql, 'start', Mock())
|
||||
@patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
|
||||
@patch.object(Postgresql, '_get_local_timeline_lsn', Mock(return_value=(2, '0/40159C1')))
|
||||
@patch.object(Postgresql, 'check_leader_is_not_in_recovery')
|
||||
def test__check_timeline_and_lsn(self, mock_check_leader_is_not_in_recovery):
|
||||
mock_check_leader_is_not_in_recovery.return_value = False
|
||||
self.p.trigger_check_diverged_lsn()
|
||||
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||||
mock_check_leader_is_not_in_recovery.return_value = True
|
||||
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||||
self.p.trigger_check_diverged_lsn()
|
||||
with patch('psycopg2.connect', Mock(side_effect=Exception)):
|
||||
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||||
with patch.object(MockCursor, 'fetchone',
|
||||
Mock(side_effect=[('', 2, '0/0'), ('', b'2\tG/40159C0\tno recovery target specified\n\n')])):
|
||||
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||||
self.p.trigger_check_diverged_lsn()
|
||||
with patch.object(MockCursor, 'fetchone',
|
||||
Mock(side_effect=[('', 2, '0/0'), ('', b'3\t040159C0\tno recovery target specified\n')])):
|
||||
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||||
self.p.trigger_check_diverged_lsn()
|
||||
with patch.object(MockCursor, 'fetchone', Mock(return_value=('', 1, '0/0'))):
|
||||
with patch.object(Postgresql, '_get_local_timeline_lsn', Mock(return_value=(1, '0/0'))):
|
||||
self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||||
self.p.trigger_check_diverged_lsn()
|
||||
self.assertTrue(self.p.rewind_needed_and_possible(self.leader))
|
||||
|
||||
with patch.object(Postgresql, 'restart', Mock(return_value=False)):
|
||||
self.p.set_role('replica')
|
||||
self.p.follow(None, None) # restart without rewind
|
||||
@patch.object(MockCursor, 'fetchone', Mock(side_effect=[(True,), Exception]))
|
||||
def test_check_leader_is_not_in_recovery(self):
|
||||
self.p.check_leader_is_not_in_recovery()
|
||||
self.p.check_leader_is_not_in_recovery()
|
||||
|
||||
with patch.object(Postgresql, 'stop', Mock(return_value=False)):
|
||||
self.p.follow(self.leader, self.leader, need_rewind=True) # failed to stop postgres
|
||||
@patch.object(Postgresql, 'checkpoint', side_effect=['', '1'])
|
||||
@patch.object(Postgresql, 'stop', Mock(return_value=False))
|
||||
@patch.object(Postgresql, 'start', Mock())
|
||||
def test_rewind(self, mock_checkpoint):
|
||||
self.p.rewind(self.leader)
|
||||
with patch.object(Postgresql, 'pg_rewind', Mock(return_value=False)):
|
||||
mock_checkpoint.side_effect = ['1', '', '', '']
|
||||
self.p.rewind(self.leader)
|
||||
self.p.rewind(self.leader)
|
||||
with patch.object(Postgresql, 'check_leader_is_not_in_recovery', Mock(return_value=False)):
|
||||
self.p.rewind(self.leader)
|
||||
self.p.config['remove_data_directory_on_rewind_failure'] = False
|
||||
self.p.trigger_check_diverged_lsn()
|
||||
self.p.rewind(self.leader)
|
||||
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
|
||||
self.p.rewind(self.leader)
|
||||
self.p.is_leader = Mock(return_value=False)
|
||||
self.p.rewind(self.leader)
|
||||
|
||||
self.p.follow(self.leader, self.leader) # "leader" is not accessible or is_in_recovery
|
||||
|
||||
with patch.object(Postgresql, 'checkpoint', Mock(return_value=None)):
|
||||
self.p.follow(self.leader, self.leader)
|
||||
mock_pg_rewind.return_value = True
|
||||
self.p.follow(self.leader, self.leader, need_rewind=True)
|
||||
|
||||
self.p.follow(None, None) # check_recovery_conf...
|
||||
@patch.object(Postgresql, 'is_running', Mock(return_value=False))
|
||||
@patch.object(Postgresql, 'start', Mock())
|
||||
def test_follow(self):
|
||||
self.p.follow(None)
|
||||
|
||||
@patch('subprocess.check_output', Mock(return_value=0, side_effect=pg_controldata_string))
|
||||
def test_can_rewind(self):
|
||||
@@ -333,7 +374,7 @@ class TestPostgresql(unittest.TestCase):
|
||||
self.assertFalse(self.p.can_rewind)
|
||||
with patch('subprocess.call', side_effect=OSError):
|
||||
self.assertFalse(self.p.can_rewind)
|
||||
with patch.object(Postgresql, 'controldata', Mock(return_value={'wal_log_hints setting': 'on'})):
|
||||
with patch.object(Postgresql, 'controldata', Mock(return_value={'Current wal_log_hints setting': 'on'})):
|
||||
self.assertTrue(self.p.can_rewind)
|
||||
self.p.config['use_pg_rewind'] = False
|
||||
self.assertFalse(self.p.can_rewind)
|
||||
@@ -407,7 +448,7 @@ class TestPostgresql(unittest.TestCase):
|
||||
self.assertFalse(self.p.is_healthy())
|
||||
|
||||
def test_promote(self):
|
||||
self.p._role = 'replica'
|
||||
self.p.set_role('replica')
|
||||
self.assertTrue(self.p.promote())
|
||||
self.assertTrue(self.p.promote())
|
||||
|
||||
@@ -425,7 +466,9 @@ class TestPostgresql(unittest.TestCase):
|
||||
self.assertFalse(self.p.is_running())
|
||||
|
||||
@patch('shlex.split', Mock(side_effect=OSError))
|
||||
@patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
|
||||
def test_call_nowait(self):
|
||||
self.p.set_role('replica')
|
||||
self.assertIsNone(self.p.call_nowait('on_start'))
|
||||
|
||||
def test_non_existing_callback(self):
|
||||
@@ -502,71 +545,12 @@ class TestPostgresql(unittest.TestCase):
|
||||
data = self.p.controldata()
|
||||
self.assertEquals(len(data), 50)
|
||||
self.assertEquals(data['Database cluster state'], 'shut down in recovery')
|
||||
self.assertEquals(data['wal_log_hints setting'], 'on')
|
||||
self.assertEquals(data['Current wal_log_hints setting'], 'on')
|
||||
self.assertEquals(int(data['Database block size']), 8192)
|
||||
|
||||
with patch('subprocess.check_output', Mock(side_effect=subprocess.CalledProcessError(1, ''))):
|
||||
self.assertEquals(self.p.controldata(), {})
|
||||
|
||||
def test_read_postmaster_opts(self):
|
||||
m = mock_open(read_data=postmaster_opts_string())
|
||||
with patch.object(builtins, 'open', m):
|
||||
data = self.p.read_postmaster_opts()
|
||||
self.assertEquals(data['wal_level'], 'hot_standby')
|
||||
self.assertEquals(int(data['max_replication_slots']), 5)
|
||||
self.assertEqual(data.get('D'), None)
|
||||
|
||||
m.side_effect = IOError
|
||||
data = self.p.read_postmaster_opts()
|
||||
self.assertEqual(data, dict())
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
@patch.object(builtins, 'open', MagicMock(return_value=42))
|
||||
def test_single_user_mode(self, subprocess_popen_mock):
|
||||
subprocess_popen_mock.return_value.wait.return_value = 0
|
||||
self.assertEquals(self.p.single_user_mode(options=dict(archive_mode='on', archive_command='false')), 0)
|
||||
subprocess_popen_mock.assert_called_once_with(['postgres', '--single', '-D', self.data_dir,
|
||||
'-c', 'archive_command=false', '-c', 'archive_mode=on',
|
||||
'postgres'], stdin=subprocess.PIPE,
|
||||
stdout=42,
|
||||
stderr=subprocess.STDOUT)
|
||||
subprocess_popen_mock.reset_mock()
|
||||
self.assertEquals(self.p.single_user_mode(command="CHECKPOINT"), 0)
|
||||
subprocess_popen_mock.assert_called_once_with(['postgres', '--single', '-D', self.data_dir,
|
||||
'postgres'], stdin=subprocess.PIPE,
|
||||
stdout=42,
|
||||
stderr=subprocess.STDOUT)
|
||||
subprocess_popen_mock.return_value = None
|
||||
self.assertEquals(self.p.single_user_mode(), 1)
|
||||
|
||||
@patch('os.listdir', MagicMock(side_effect=fake_listdir))
|
||||
@patch('os.unlink', return_value=True)
|
||||
@patch('os.remove', return_value=True)
|
||||
@patch('os.path.islink', return_value=False)
|
||||
@patch('os.path.isfile', return_value=True)
|
||||
def test_cleanup_archive_status(self, mock_file, mock_link, mock_remove, mock_unlink):
|
||||
ap = os.path.join(self.data_dir, 'pg_xlog', 'archive_status/')
|
||||
self.p.cleanup_archive_status()
|
||||
mock_remove.assert_has_calls([mock.call(ap + 'a'), mock.call(ap + 'b'), mock.call(ap + 'c')])
|
||||
mock_unlink.assert_not_called()
|
||||
|
||||
mock_remove.reset_mock()
|
||||
|
||||
mock_file.return_value = False
|
||||
mock_link.return_value = True
|
||||
self.p.cleanup_archive_status()
|
||||
mock_unlink.assert_has_calls([mock.call(ap + 'a'), mock.call(ap + 'b'), mock.call(ap + 'c')])
|
||||
mock_remove.assert_not_called()
|
||||
|
||||
mock_unlink.reset_mock()
|
||||
mock_remove.reset_mock()
|
||||
|
||||
mock_file.side_effect = OSError
|
||||
mock_link.side_effect = OSError
|
||||
self.p.cleanup_archive_status()
|
||||
mock_unlink.assert_not_called()
|
||||
mock_remove.assert_not_called()
|
||||
|
||||
@patch('patroni.postgresql.Postgresql._version_file_exists', Mock(return_value=True))
|
||||
@patch('subprocess.check_output', MagicMock(return_value=0, side_effect=pg_controldata_string))
|
||||
def test_sysid(self):
|
||||
|
||||
Reference in New Issue
Block a user