Code review, asynchronous restarts.

- Make the restart initiated by the schedule asynchronous
- Fix the placeholders in logs.
- Fix the regexp to detect the PostgreSQL version.
This commit is contained in:
Oleksii Kliukin
2016-07-12 20:25:01 +02:00
parent ec160f0d59
commit 3181c4e59f
4 changed files with 41 additions and 28 deletions

View File

@@ -91,7 +91,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
status_code = 503
elif 'role' in response and response['role'] in path:
status_code = 503 if response['role'] != 'master' and patroni.noloadbalance else 200
elif patroni.ha.immediate_restart_scheduled() and patroni.postgresql.role == 'master' and 'master' in path:
elif patroni.ha.restart_scheduled() and patroni.postgresql.role == 'master' and 'master' in path:
# exceptional case for master node when the postgres is being restarted via API
status_code = 200
else:

View File

@@ -388,19 +388,18 @@ class Ha(object):
request_time = restart_data['postmaster_start_time']
# check if postmaster start time has changed since the last restart
if recent_time and request_time and recent_time != request_time:
logger.info("Cancelling scheduled restart: postgres restart has already happened at {0}".
format(recent_time))
logger.info("Cancelling scheduled restart: postgres restart has already happened at %s", recent_time)
self.delete_future_restart()
return
return None
if (restart_data and
self.should_run_scheduled_action('restart', restart_data['schedule'], self.delete_future_restart)):
try:
ret, message = self.restart(restart_data)
if ret:
logger.info("Scheduled restart successful")
else:
logger.warning("Scheduled restart: {0}".format(message))
ret, message = self.restart(restart_data, run_async=True)
if not ret:
logger.warning("Scheduled restart: %s", message)
return None
return message
finally:
self.delete_future_restart()
@@ -421,12 +420,12 @@ class Ha(object):
if not reason_to_cancel:
return True
else:
logger.info("not proceeding with the restart: {0}".format(reason_to_cancel))
logger.info("not proceeding with the restart: %s", reason_to_cancel)
return False
def schedule(self, action):
def schedule(self, action, immediate=False):
with self._async_executor:
return self._async_executor.schedule(action)
return self._async_executor.schedule(action, immediate)
def schedule_future_restart(self, restart_data):
if isinstance(restart_data, dict):
@@ -444,9 +443,6 @@ class Ha(object):
ret = True
return ret
def immediate_restart_scheduled(self):
return self._async_executor.scheduled_action == 'restart'
def future_restart_scheduled(self):
return self.patroni.scheduled_restart.copy() if (self.patroni.scheduled_restart and
isinstance(self.patroni.scheduled_restart, dict)) else None
@@ -457,7 +453,13 @@ class Ha(object):
def reinitialize_scheduled(self):
return self._async_executor.scheduled_action == 'reinitialize'
def restart(self, restart_data=None):
def schedule_restart(self, immediate=False):
return self.schedule('restart', immediate)
def restart_scheduled(self):
return self._async_executor.scheduled_action == 'restart'
def restart(self, restart_data=None, run_async=False):
""" conditional and unconditional restart """
if (restart_data and isinstance(restart_data, dict) and
not self.restart_matches(restart_data.get('role'),
@@ -466,13 +468,17 @@ class Ha(object):
return (False, "restart conditions are not satisfied")
with self._async_executor:
prev = self._async_executor.schedule('restart', True)
prev = self.schedule_restart(immediate=(not run_async))
if prev is not None:
return (False, prev + ' already in progress')
if self._async_executor.run(self.state_handler.restart):
return (True, 'restarted successfully')
else:
return (False, 'restart failed')
if not run_async:
if self._async_executor.run(self.state_handler.restart):
return (True, 'restarted successfully')
else:
return (False, 'restart failed')
else:
self._async_executor.run_async(self.state_handler.restart)
return (True, "restart initiated")
def reinitialize(self, cluster):
self.state_handler.stop('immediate')
@@ -572,7 +578,9 @@ class Ha(object):
if self.cluster.is_unlocked():
return self.process_unhealthy_cluster()
else:
self.evaluate_scheduled_restart()
msg = self.evaluate_scheduled_restart()
if msg is not None:
return msg
return self.process_healthy_cluster()
finally:
# we might not have a valid PostgreSQL connection here if another thread

View File

@@ -45,7 +45,7 @@ class MockHa(object):
return (True, '')
@staticmethod
def immediate_restart_scheduled():
def restart_scheduled():
return False
@staticmethod
@@ -120,7 +120,7 @@ class TestRestApiHandler(unittest.TestCase):
MockPatroni.dcs.cluster = None
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'master'})):
MockRestApiServer(RestApiHandler, 'GET /master')
with patch.object(MockHa, 'immediate_restart_scheduled', Mock(return_value=True)):
with patch.object(MockHa, 'restart_scheduled', Mock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /master')
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /master'))

View File

@@ -288,7 +288,7 @@ class TestHa(unittest.TestCase):
def test_restart_in_progress(self):
self.ha._async_executor.schedule('restart', True)
self.assertTrue(self.ha.immediate_restart_scheduled())
self.assertTrue(self.ha.restart_scheduled())
self.assertEquals(self.ha.run_cycle(), 'not healthy enough for leader race')
self.ha.cluster = get_cluster_initialized_with_leader()
@@ -416,15 +416,20 @@ class TestHa(unittest.TestCase):
with patch.object(self.ha,
'future_restart_scheduled', Mock(return_value={'postmaster_start_time': '2016-08-30 12:45TZ+1',
'schedule': '2016-08-31 12:45TZ+1'})):
self.ha.evaluate_scheduled_restart()
self.assertIsNone(self.ha.evaluate_scheduled_restart())
with patch.object(self.ha,
'future_restart_scheduled', Mock(return_value={'postmaster_start_time': '2016-08-31 12:45TZ+1',
'schedule': '2016-08-31 12:45TZ+1'})):
with patch.object(self.ha,
'should_run_scheduled_action', Mock(return_value=True)):
self.ha.evaluate_scheduled_restart()
self.assertIsNotNone(self.ha.evaluate_scheduled_restart())
with patch.object(self.ha, 'restart', Mock(return_value=(False, "Test"))):
self.ha.evaluate_scheduled_restart()
self.assertIsNone(self.ha.evaluate_scheduled_restart())
def test_scheduled_restart(self):
self.ha.cluster = get_cluster_initialized_with_leader()
with patch.object(self.ha, "evaluate_scheduled_restart", Mock(return_value="restart scheduled")):
self.assertEquals(self.ha.run_cycle(), "restart scheduled")
def test_restart_matches(self):
self.p._role = 'replica'