mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Optional fencing script (pre_promote) (#1099)
Call a fencing script after acquiring the leader lock. If the script didn't finish successfully - don't promote but remove leader key Close https://github.com/zalando/patroni/issues/1567
This commit is contained in:
@@ -267,6 +267,7 @@ PostgreSQL
|
||||
- **remove\_data\_directory\_on\_rewind\_failure**: If this option is enabled, Patroni will remove the PostgreSQL data directory and recreate the replica. Otherwise it will try to follow the new leader. Default value is **false**.
|
||||
- **remove\_data\_directory\_on\_diverged\_timelines**: Patroni will remove the PostgreSQL data directory and recreate the replica if it notices that timelines are diverging and the former master can not start streaming from the new master. This option is useful when ``pg_rewind`` can not be used. Default value is **false**.
|
||||
- **replica\_method**: for each create_replica_methods other than basebackup, you would add a configuration section of the same name. At a minimum, this should include "command" with a full path to the actual script to be executed. Other configuration parameters will be passed along to the script in the form "parameter=value".
|
||||
- **pre\_promote**: a fencing script that executes during a failover after acquiring the leader lock but before promoting the replica. If the script exits with a non-zero code, Patroni does not promote the replica and removes the leader key from DCS.
|
||||
|
||||
REST API
|
||||
--------
|
||||
|
||||
@@ -70,7 +70,7 @@ class Ha(object):
|
||||
self._was_paused = False
|
||||
self._leader_timeline = None
|
||||
self.recovering = False
|
||||
self._post_bootstrap_task = None
|
||||
self._async_response = CriticalTask()
|
||||
self._crash_recovery_executed = False
|
||||
self._start_timeout = None
|
||||
self._async_executor = AsyncExecutor(self.state_handler.cancellable, self.wakeup)
|
||||
@@ -87,6 +87,9 @@ class Ha(object):
|
||||
# already running as replica was aborted due to cluster not beeing initialized in DCS.
|
||||
self._join_aborted = False
|
||||
|
||||
# used only in backoff after failing a pre_promote script
|
||||
self._released_leader_key_timestamp = 0
|
||||
|
||||
def check_mode(self, mode):
|
||||
# Try to protect from the case when DCS was wiped out during pause
|
||||
if self.cluster and self.cluster.config and self.cluster.config.modify_index:
|
||||
@@ -252,7 +255,8 @@ class Ha(object):
|
||||
elif self.cluster.initialize is None and not self.patroni.nofailover and 'bootstrap' in self.patroni.config:
|
||||
if self.dcs.initialize(create_new=True): # race for initialization
|
||||
self.state_handler.bootstrapping = True
|
||||
self._post_bootstrap_task = CriticalTask()
|
||||
with self._async_response:
|
||||
self._async_response.reset()
|
||||
|
||||
if self.is_standby_cluster():
|
||||
ret = self._async_executor.try_run_async('bootstrap_standby_leader', self.bootstrap_standby_leader)
|
||||
@@ -279,7 +283,8 @@ class Ha(object):
|
||||
clone_source = self.get_remote_master()
|
||||
msg = 'clone from remote master {0}'.format(clone_source.conn_url)
|
||||
result = self.clone(clone_source, msg)
|
||||
self._post_bootstrap_task.complete(result)
|
||||
with self._async_response: # pretend that post_bootstrap was already executed
|
||||
self._async_response.complete(result)
|
||||
if result:
|
||||
self.state_handler.set_role('standby_leader')
|
||||
|
||||
@@ -556,13 +561,27 @@ class Ha(object):
|
||||
return self.follow(demote_reason, message)
|
||||
|
||||
def enforce_master_role(self, message, promote_message):
|
||||
if not self.is_paused() and not self.watchdog.is_running and not self.watchdog.activate():
|
||||
if self.state_handler.is_leader():
|
||||
self.demote('immediate')
|
||||
return 'Demoting self because watchdog could not be activated'
|
||||
else:
|
||||
self.release_leader_key_voluntarily()
|
||||
return 'Not promoting self because watchdog could not be activated'
|
||||
"""
|
||||
Ensure the node that has won the race for the leader key meets criteria
|
||||
for promoting its PG server to the 'master' role.
|
||||
"""
|
||||
if not self.is_paused():
|
||||
if not self.watchdog.is_running and not self.watchdog.activate():
|
||||
if self.state_handler.is_leader():
|
||||
self.demote('immediate')
|
||||
return 'Demoting self because watchdog could not be activated'
|
||||
else:
|
||||
self.release_leader_key_voluntarily()
|
||||
return 'Not promoting self because watchdog could not be activated'
|
||||
|
||||
with self._async_response:
|
||||
if self._async_response.result is False:
|
||||
logger.warning("Releasing the leader key voluntarily because the pre-promote script failed")
|
||||
self._released_leader_key_timestamp = time.time()
|
||||
self.release_leader_key_voluntarily()
|
||||
# discard the result of the failed pre-promote script to be able to re-try promote
|
||||
self._async_response.reset()
|
||||
return 'Promotion cancelled because the pre-promote script failed'
|
||||
|
||||
if self.state_handler.is_leader():
|
||||
# Inform the state handler about its master role.
|
||||
@@ -590,8 +609,10 @@ class Ha(object):
|
||||
self._rewind.reset_state()
|
||||
logger.info("cleared rewind state after becoming the leader")
|
||||
|
||||
with self._async_response:
|
||||
self._async_response.reset()
|
||||
self._async_executor.try_run_async('promote', self.state_handler.promote,
|
||||
args=(self.dcs.loop_wait, on_success,
|
||||
args=(self.dcs.loop_wait, self._async_response, on_success,
|
||||
self._leader_access_is_restricted))
|
||||
return promote_message
|
||||
|
||||
@@ -727,6 +748,10 @@ class Ha(object):
|
||||
return self._is_healthiest_node(members, check_replication_lag=False)
|
||||
|
||||
def is_healthiest_node(self):
|
||||
if time.time() - self._released_leader_key_timestamp < self.dcs.ttl:
|
||||
logger.info('backoff: skip leader race after pre_promote script failure and releasing the lock voluntarily')
|
||||
return False
|
||||
|
||||
if self.is_paused() and not self.patroni.nofailover and \
|
||||
self.cluster.failover and not self.cluster.failover.scheduled_at:
|
||||
ret = self.manual_failover_process_no_leader()
|
||||
@@ -916,9 +941,9 @@ class Ha(object):
|
||||
self.load_cluster_from_dcs()
|
||||
|
||||
if self.is_standby_cluster():
|
||||
# standby leader disappeared, and this is a healthiest
|
||||
# standby leader disappeared, and this is the healthiest
|
||||
# replica, so it should become a new standby leader.
|
||||
# This imply that we need to start following a remote master
|
||||
# This implies we need to start following a remote master
|
||||
msg = 'promoted self to a standby leader by acquiring session lock'
|
||||
return self.enforce_follow_remote_master(msg)
|
||||
else:
|
||||
@@ -1136,10 +1161,20 @@ class Ha(object):
|
||||
self._async_executor.run_async(self._do_reinitialize, args=(self.cluster, ))
|
||||
|
||||
def handle_long_action_in_progress(self):
|
||||
"""
|
||||
Figure out what to do with the task AsyncExecutor is performing.
|
||||
"""
|
||||
if self.has_lock() and self.update_lock():
|
||||
return 'updated leader lock during ' + self._async_executor.scheduled_action
|
||||
elif not self.state_handler.bootstrapping:
|
||||
# Don't have lock, make sure we are not starting up a master in the background
|
||||
# Don't have lock, make sure we are not promoting or starting up a master in the background
|
||||
if self._async_executor.scheduled_action == 'promote':
|
||||
with self._async_response:
|
||||
cancel = self._async_response.cancel()
|
||||
if cancel:
|
||||
self.state_handler.cancellable.cancel()
|
||||
return 'lost leader before promote'
|
||||
|
||||
if self.state_handler.role == 'master':
|
||||
logger.info("Demoting master during " + self._async_executor.scheduled_action)
|
||||
if self._async_executor.scheduled_action == 'restart':
|
||||
@@ -1150,7 +1185,6 @@ class Ha(object):
|
||||
self.state_handler.terminate_starting_postmaster(postmaster=task.result)
|
||||
self.demote('immediate-nolock')
|
||||
return 'lost leader lock during ' + self._async_executor.scheduled_action
|
||||
|
||||
if self.cluster.is_unlocked():
|
||||
logger.info('not healthy enough for leader race')
|
||||
|
||||
@@ -1185,17 +1219,19 @@ class Ha(object):
|
||||
raise PatroniFatalException('Failed to bootstrap cluster')
|
||||
|
||||
def post_bootstrap(self):
|
||||
with self._async_response:
|
||||
result = self._async_response.result
|
||||
# bootstrap has failed if postgres is not running
|
||||
if not self.state_handler.is_running() or self._post_bootstrap_task.result is False:
|
||||
if not self.state_handler.is_running() or result is False:
|
||||
self.cancel_initialization()
|
||||
|
||||
if self._post_bootstrap_task.result is None:
|
||||
if result is None:
|
||||
if not self.state_handler.is_leader():
|
||||
return 'waiting for end of recovery after bootstrap'
|
||||
|
||||
self.state_handler.set_role('master')
|
||||
ret = self._async_executor.try_run_async('post_bootstrap', self.state_handler.bootstrap.post_bootstrap,
|
||||
args=(self.patroni.config['bootstrap'], self._post_bootstrap_task))
|
||||
args=(self.patroni.config['bootstrap'], self._async_response))
|
||||
return ret or 'running post_bootstrap'
|
||||
|
||||
self.state_handler.bootstrapping = False
|
||||
@@ -1409,7 +1445,7 @@ class Ha(object):
|
||||
self.while_not_sync_standby(lambda: self.state_handler.stop(checkpoint=False, on_safepoint=disable_wd,
|
||||
stop_timeout=self.master_stop_timeout()))
|
||||
if not self.state_handler.is_running():
|
||||
if self.has_lock():
|
||||
if self.is_leader():
|
||||
checkpoint_location = self.state_handler.latest_checkpoint_location()
|
||||
self.dcs.delete_leader(checkpoint_location)
|
||||
self.touch_member()
|
||||
|
||||
@@ -20,8 +20,8 @@ from patroni.postgresql.postmaster import PostmasterProcess
|
||||
from patroni.postgresql.slots import SlotsHandler
|
||||
from patroni.exceptions import PostgresConnectionException
|
||||
from patroni.utils import Retry, RetryFailedError, polling_loop, data_directory_is_empty, parse_int
|
||||
from threading import current_thread, Lock
|
||||
from psutil import TimeoutExpired
|
||||
from threading import current_thread, Lock
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -795,9 +795,38 @@ class Postgresql(object):
|
||||
if data.get('Database cluster state') == 'in production':
|
||||
return True
|
||||
|
||||
def promote(self, wait_seconds, on_success=None, access_is_restricted=False):
|
||||
def _pre_promote(self):
|
||||
"""
|
||||
Runs a fencing script after the leader lock is acquired but before the replica is promoted.
|
||||
If the script exits with a non-zero code, promotion does not happen and the leader key is removed from DCS.
|
||||
"""
|
||||
|
||||
cmd = self.config.get('pre_promote')
|
||||
if not cmd:
|
||||
return True
|
||||
|
||||
ret = self.cancellable.call(shlex.split(cmd))
|
||||
if ret is not None:
|
||||
logger.info('pre_promote script `%s` exited with %s', cmd, ret)
|
||||
return ret == 0
|
||||
|
||||
def promote(self, wait_seconds, task, on_success=None, access_is_restricted=False):
|
||||
if self.role == 'master':
|
||||
return True
|
||||
|
||||
ret = self._pre_promote()
|
||||
with task:
|
||||
if task.is_cancelled:
|
||||
return False
|
||||
task.complete(ret)
|
||||
|
||||
if ret is False:
|
||||
return False
|
||||
|
||||
if self.cancellable.is_cancelled:
|
||||
logger.info("PostgreSQL promote cancelled.")
|
||||
return False
|
||||
|
||||
ret = self.pg_ctl('promote', '-W')
|
||||
if ret:
|
||||
self.set_role('master')
|
||||
|
||||
@@ -11,6 +11,10 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class CancellableExecutor(object):
|
||||
|
||||
"""
|
||||
There must be only one such process so that AsyncExecutor can easily cancel it.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._process = None
|
||||
self._process_cmd = None
|
||||
@@ -115,6 +119,8 @@ class CancellableSubprocess(CancellableExecutor):
|
||||
self._is_cancelled = True
|
||||
if self._process is None or not self._process.is_running():
|
||||
return
|
||||
|
||||
logger.info('Terminating %s', self._process_cmd)
|
||||
self._process.terminate()
|
||||
|
||||
for _ in polling_loop(10):
|
||||
|
||||
@@ -115,6 +115,8 @@ postgresql:
|
||||
# same as KRB5CCNAME used by the GSS
|
||||
# krb_server_keyfile: /var/spool/keytabs/postgres
|
||||
unix_socket_directories: '.'
|
||||
# Additional fencing script executed after acquiring the leader lock but before promoting the replica
|
||||
#pre_promote: /path/to/pre_promote.sh
|
||||
|
||||
#watchdog:
|
||||
# mode: automatic # Allowed values: off, automatic, required
|
||||
|
||||
@@ -112,6 +112,9 @@ postgresql:
|
||||
basebackup:
|
||||
- verbose
|
||||
- max-rate: 100M
|
||||
# Additional fencing script executed after acquiring the leader lock but before promoting the replica
|
||||
#pre_promote: /path/to/pre_promote.sh
|
||||
|
||||
tags:
|
||||
nofailover: false
|
||||
noloadbalance: false
|
||||
|
||||
@@ -302,6 +302,19 @@ class TestHa(PostgresInit):
|
||||
self.p.is_leader = false
|
||||
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
||||
|
||||
def test_promotion_cancelled_after_pre_promote_failed(self):
|
||||
self.p.is_leader = false
|
||||
self.p._pre_promote = false
|
||||
self.ha._is_healthiest_node = true
|
||||
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
|
||||
self.assertEqual(self.ha.run_cycle(), 'Promotion cancelled because the pre-promote script failed')
|
||||
self.assertEqual(self.ha.run_cycle(), 'following a different leader because i am not the healthiest node')
|
||||
|
||||
def test_lost_leader_lock_during_promote(self):
|
||||
with patch('patroni.async_executor.AsyncExecutor.busy', PropertyMock(return_value=True)):
|
||||
self.ha._async_executor.schedule('promote')
|
||||
self.assertEqual(self.ha.run_cycle(), 'lost leader before promote')
|
||||
|
||||
def test_long_promote(self):
|
||||
self.ha.cluster.is_unlocked = false
|
||||
self.ha.has_lock = true
|
||||
@@ -1042,7 +1055,7 @@ class TestHa(PostgresInit):
|
||||
|
||||
def test_shutdown(self):
|
||||
self.p.is_running = false
|
||||
self.ha.has_lock = true
|
||||
self.ha.is_leader = true
|
||||
self.ha.shutdown()
|
||||
|
||||
@patch('time.sleep', Mock())
|
||||
|
||||
@@ -357,10 +357,22 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
mock_is_running.return_value = False
|
||||
self.assertFalse(self.p.is_healthy())
|
||||
|
||||
def test_promote(self):
|
||||
@patch('psutil.Popen')
|
||||
def test_promote(self, mock_popen):
|
||||
mock_popen.return_value.wait.return_value = 0
|
||||
task = CriticalTask()
|
||||
self.assertTrue(self.p.promote(0, task))
|
||||
|
||||
self.p.set_role('replica')
|
||||
self.assertIsNone(self.p.promote(0))
|
||||
self.assertTrue(self.p.promote(0))
|
||||
self.p.config._config['pre_promote'] = 'test'
|
||||
with patch('patroni.postgresql.cancellable.CancellableSubprocess.is_cancelled', PropertyMock(return_value=1)):
|
||||
self.assertFalse(self.p.promote(0, task))
|
||||
|
||||
mock_popen.side_effect = Exception
|
||||
self.assertFalse(self.p.promote(0, task))
|
||||
task.reset()
|
||||
task.cancel()
|
||||
self.assertFalse(self.p.promote(0, task))
|
||||
|
||||
def test_timeline_wal_position(self):
|
||||
self.assertEqual(self.p.timeline_wal_position(), (1, 2, 1))
|
||||
|
||||
Reference in New Issue
Block a user