mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-28 02:20:04 +00:00
If Patroni gets partitioned it starts receiving stale information from DCS. We can't use this information to determine that we have the leader key. Instead, we will record in Ha object the actual state of acquire/update lock and report as a leader only if it was successful. P.S. despite responding with 200 on `GET /master` postgres was still running read-only.
397 lines
16 KiB
Python
397 lines
16 KiB
Python
import datetime
|
|
import json
|
|
import psycopg2
|
|
import unittest
|
|
|
|
from mock import Mock, PropertyMock, patch
|
|
from patroni.api import RestApiHandler, RestApiServer
|
|
from patroni.dcs import ClusterConfig, Member
|
|
from patroni.ha import _MemberStatus
|
|
from patroni.utils import tzutc
|
|
from six import BytesIO as IO
|
|
from six.moves import BaseHTTPServer
|
|
from test_postgresql import psycopg2_connect, MockCursor
|
|
|
|
|
|
future_restart_time = datetime.datetime.now(tzutc) + datetime.timedelta(days=5)
|
|
postmaster_start_time = datetime.datetime.now(tzutc)
|
|
|
|
|
|
class MockPostgresql(object):
|
|
|
|
name = 'test'
|
|
state = 'running'
|
|
role = 'master'
|
|
server_version = '999999'
|
|
sysid = 'dummysysid'
|
|
scope = 'dummy'
|
|
pending_restart = True
|
|
wal_name = 'wal'
|
|
lsn_name = 'lsn'
|
|
|
|
@staticmethod
|
|
def connection():
|
|
return psycopg2_connect()
|
|
|
|
@staticmethod
|
|
def postmaster_start_time():
|
|
return str(postmaster_start_time)
|
|
|
|
@staticmethod
|
|
def replica_cached_timeline(_):
|
|
return 2
|
|
|
|
|
|
class MockWatchdog(object):
|
|
is_healthy = False
|
|
|
|
|
|
class MockHa(object):
|
|
|
|
state_handler = MockPostgresql()
|
|
watchdog = MockWatchdog()
|
|
|
|
@staticmethod
|
|
def is_leader():
|
|
return False
|
|
|
|
@staticmethod
|
|
def reinitialize(_):
|
|
return 'reinitialize'
|
|
|
|
@staticmethod
|
|
def restart():
|
|
return (True, '')
|
|
|
|
@staticmethod
|
|
def restart_scheduled():
|
|
return False
|
|
|
|
@staticmethod
|
|
def delete_future_restart():
|
|
return True
|
|
|
|
@staticmethod
|
|
def fetch_nodes_statuses(members):
|
|
return [_MemberStatus(None, True, None, None, {}, False)]
|
|
|
|
@staticmethod
|
|
def schedule_future_restart(data):
|
|
return True
|
|
|
|
@staticmethod
|
|
def is_lagging(wal):
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_effective_tags():
|
|
return {'nosync': True}
|
|
|
|
@staticmethod
|
|
def wakeup():
|
|
pass
|
|
|
|
@staticmethod
|
|
def is_paused():
|
|
return True
|
|
|
|
|
|
class MockPatroni(object):
|
|
|
|
ha = MockHa()
|
|
config = Mock()
|
|
postgresql = ha.state_handler
|
|
dcs = Mock()
|
|
tags = {}
|
|
version = '0.00'
|
|
noloadbalance = PropertyMock(return_value=False)
|
|
scheduled_restart = {'schedule': future_restart_time,
|
|
'postmaster_start_time': postgresql.postmaster_start_time()}
|
|
|
|
@staticmethod
|
|
def sighup_handler():
|
|
pass
|
|
|
|
|
|
class MockRequest(object):
|
|
|
|
def __init__(self, request):
|
|
self.request = request.encode('utf-8')
|
|
|
|
def makefile(self, *args, **kwargs):
|
|
return IO(self.request)
|
|
|
|
def sendall(self, *args, **kwargs):
|
|
pass
|
|
|
|
|
|
class MockRestApiServer(RestApiServer):
|
|
|
|
def __init__(self, Handler, request, config=None):
|
|
self.socket = 0
|
|
self.serve_forever = Mock()
|
|
BaseHTTPServer.HTTPServer.__init__ = Mock()
|
|
MockRestApiServer._BaseServer__is_shut_down = Mock()
|
|
MockRestApiServer._BaseServer__shutdown_request = True
|
|
config = config or {'listen': '127.0.0.1:8008', 'auth': 'test:test', 'certfile': 'dumb'}
|
|
super(MockRestApiServer, self).__init__(MockPatroni(), config)
|
|
Handler(MockRequest(request), ('0.0.0.0', 8080), self)
|
|
|
|
|
|
@patch('ssl.wrap_socket', Mock(return_value=0))
|
|
class TestRestApiHandler(unittest.TestCase):
|
|
|
|
_authorization = '\nAuthorization: Basic dGVzdDp0ZXN0'
|
|
|
|
def test_do_GET(self):
|
|
MockRestApiServer(RestApiHandler, 'GET /replica')
|
|
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={})):
|
|
MockRestApiServer(RestApiHandler, 'GET /replica')
|
|
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'master'})):
|
|
MockRestApiServer(RestApiHandler, 'GET /replica')
|
|
MockRestApiServer(RestApiHandler, 'GET /master')
|
|
MockPatroni.dcs.cluster.sync.sync_standby = MockPostgresql.name
|
|
MockPatroni.dcs.cluster.is_synchronous_mode = Mock(return_value=True)
|
|
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'replica'})):
|
|
MockRestApiServer(RestApiHandler, 'GET /synchronous')
|
|
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'replica'})):
|
|
MockRestApiServer(RestApiHandler, 'GET /asynchronous')
|
|
MockPatroni.ha.is_leader = Mock(return_value=True)
|
|
MockRestApiServer(RestApiHandler, 'GET /replica')
|
|
MockPatroni.dcs.cluster = None
|
|
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'master'})):
|
|
MockRestApiServer(RestApiHandler, 'GET /master')
|
|
with patch.object(MockHa, 'restart_scheduled', Mock(return_value=True)):
|
|
MockRestApiServer(RestApiHandler, 'GET /master')
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /master'))
|
|
with patch.object(RestApiServer, 'query', Mock(return_value=[('', 1, '', '', '', '', False, '')])):
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
|
|
|
|
def test_do_OPTIONS(self):
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'OPTIONS / HTTP/1.0'))
|
|
|
|
@patch.object(MockPostgresql, 'state', PropertyMock(return_value='stopped'))
|
|
def test_do_GET_patroni(self):
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
|
|
|
|
def test_basicauth(self):
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'POST /restart HTTP/1.0'))
|
|
MockRestApiServer(RestApiHandler, 'POST /restart HTTP/1.0\nAuthorization:')
|
|
|
|
@patch.object(MockPatroni, 'dcs')
|
|
def test_do_GET_config(self, mock_dcs):
|
|
mock_dcs.cluster.config.data = {}
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /config'))
|
|
mock_dcs.cluster.config = None
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /config'))
|
|
|
|
@patch.object(MockPatroni, 'dcs')
|
|
def test_do_PATCH_config(self, mock_dcs):
|
|
config = {'postgresql': {'use_slots': False, 'use_pg_rewind': True, 'parameters': {'wal_level': 'logical'}}}
|
|
mock_dcs.get_cluster.return_value.config = ClusterConfig.from_node(1, json.dumps(config))
|
|
request = 'PATCH /config HTTP/1.0' + self._authorization
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, request))
|
|
request += '\nContent-Length: '
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, request + '34\n\n{"postgresql":{"use_slots":false}}'))
|
|
config['ttl'] = 5
|
|
config['postgresql'].update({'use_slots': {'foo': True}, "parameters": None})
|
|
config = json.dumps(config)
|
|
request += str(len(config)) + '\n\n' + config
|
|
MockRestApiServer(RestApiHandler, request)
|
|
mock_dcs.set_config_value.return_value = False
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
@patch.object(MockPatroni, 'dcs')
|
|
def test_do_PUT_config(self, mock_dcs):
|
|
mock_dcs.get_cluster.return_value.config = ClusterConfig.from_node(1, '{}')
|
|
request = 'PUT /config HTTP/1.0' + self._authorization + '\nContent-Length: '
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, request + '2\n\n{}'))
|
|
config = '{"foo": "bar"}'
|
|
request += str(len(config)) + '\n\n' + config
|
|
MockRestApiServer(RestApiHandler, request)
|
|
mock_dcs.set_config_value.return_value = False
|
|
MockRestApiServer(RestApiHandler, request)
|
|
mock_dcs.get_cluster.return_value.config = ClusterConfig.from_node(1, config)
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
@patch.object(MockPatroni, 'sighup_handler', Mock(side_effect=Exception))
|
|
def test_do_POST_reload(self):
|
|
with patch.object(MockPatroni, 'config') as mock_config:
|
|
mock_config.reload_local_configuration.return_value = False
|
|
MockRestApiServer(RestApiHandler, 'POST /reload HTTP/1.0' + self._authorization)
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'POST /reload HTTP/1.0' + self._authorization))
|
|
|
|
@patch.object(MockPatroni, 'dcs')
|
|
def test_do_POST_restart(self, mock_dcs):
|
|
mock_dcs.get_cluster.return_value.is_paused.return_value = False
|
|
request = 'POST /restart HTTP/1.0' + self._authorization
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, request))
|
|
|
|
with patch.object(MockHa, 'restart', Mock(side_effect=Exception)):
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
post = request + '\nContent-Length: '
|
|
|
|
def make_request(request=None, **kwargs):
|
|
request = json.dumps(kwargs) if request is None else request
|
|
return '{0}{1}\n\n{2}'.format(post, len(request), request)
|
|
|
|
# empty request
|
|
request = make_request('')
|
|
MockRestApiServer(RestApiHandler, request)
|
|
# invalid request
|
|
request = make_request('foobar=baz')
|
|
MockRestApiServer(RestApiHandler, request)
|
|
# wrong role
|
|
request = make_request(schedule=future_restart_time.isoformat(), role='unknown', postgres_version='9.5.3')
|
|
MockRestApiServer(RestApiHandler, request)
|
|
# wrong version
|
|
request = make_request(schedule=future_restart_time.isoformat(), role='master', postgres_version='9.5.3.1')
|
|
MockRestApiServer(RestApiHandler, request)
|
|
# unknown filter
|
|
request = make_request(schedule=future_restart_time.isoformat(), batman='lives')
|
|
MockRestApiServer(RestApiHandler, request)
|
|
# incorrect schedule
|
|
request = make_request(schedule='2016-08-42 12:45TZ+1', role='master')
|
|
MockRestApiServer(RestApiHandler, request)
|
|
# everything fine, but the schedule is missing
|
|
request = make_request(role='master', postgres_version='9.5.2')
|
|
MockRestApiServer(RestApiHandler, request)
|
|
for retval in (True, False):
|
|
with patch.object(MockHa, 'schedule_future_restart', Mock(return_value=retval)):
|
|
request = make_request(schedule=future_restart_time.isoformat())
|
|
MockRestApiServer(RestApiHandler, request)
|
|
with patch.object(MockHa, 'restart', Mock(return_value=(retval, "foo"))):
|
|
request = make_request(role='master', postgres_version='9.5.2')
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
mock_dcs.get_cluster.return_value.is_paused.return_value = True
|
|
MockRestApiServer(RestApiHandler, make_request(schedule='2016-08-42 12:45TZ+1', role='master'))
|
|
# Valid timeout
|
|
MockRestApiServer(RestApiHandler, make_request(timeout='60s'))
|
|
# Invalid timeout
|
|
MockRestApiServer(RestApiHandler, make_request(timeout='42towels'))
|
|
|
|
def test_do_DELETE_restart(self):
|
|
for retval in (True, False):
|
|
with patch.object(MockHa, 'delete_future_restart', Mock(return_value=retval)):
|
|
request = 'DELETE /restart HTTP/1.0' + self._authorization
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, request))
|
|
|
|
@patch.object(MockPatroni, 'dcs')
|
|
def test_do_POST_reinitialize(self, mock_dcs):
|
|
cluster = mock_dcs.get_cluster.return_value
|
|
cluster.is_paused.return_value = False
|
|
request = 'POST /reinitialize HTTP/1.0' + self._authorization + '\nContent-Length: 15\n\n{"force": true}'
|
|
MockRestApiServer(RestApiHandler, request)
|
|
with patch.object(MockHa, 'reinitialize', Mock(return_value=None)):
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
@patch('time.sleep', Mock())
|
|
def test_RestApiServer_query(self):
|
|
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.OperationalError)):
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
|
|
with patch.object(MockPostgresql, 'connection', Mock(side_effect=psycopg2.OperationalError)):
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
|
|
|
|
@patch('time.sleep', Mock())
|
|
@patch.object(MockPatroni, 'dcs')
|
|
def test_do_POST_switchover(self, dcs):
|
|
dcs.loop_wait = 10
|
|
cluster = dcs.get_cluster.return_value
|
|
cluster.is_synchronous_mode.return_value = False
|
|
cluster.is_paused.return_value = False
|
|
|
|
post = 'POST /switchover HTTP/1.0' + self._authorization + '\nContent-Length: '
|
|
|
|
MockRestApiServer(RestApiHandler, post + '7\n\n{"1":2}')
|
|
|
|
request = post + '0\n\n'
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
cluster.leader.name = 'postgresql1'
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
request = post + '25\n\n{"leader": "postgresql1"}'
|
|
|
|
cluster.is_paused.return_value = True
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
cluster.is_paused.return_value = False
|
|
for cluster.is_synchronous_mode.return_value in (True, False):
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
cluster.leader.name = 'postgresql2'
|
|
request = post + '53\n\n{"leader": "postgresql1", "candidate": "postgresql2"}'
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
cluster.leader.name = 'postgresql1'
|
|
for cluster.is_synchronous_mode.return_value in (True, False):
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
cluster.members = [Member(0, 'postgresql0', 30, {'api_url': 'http'}),
|
|
Member(0, 'postgresql2', 30, {'api_url': 'http'})]
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
cluster.failover = None
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
dcs.get_cluster.side_effect = [cluster]
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
cluster2 = cluster.copy()
|
|
cluster2.leader.name = 'postgresql0'
|
|
cluster2.is_unlocked.return_value = False
|
|
dcs.get_cluster.side_effect = [cluster, cluster2]
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
cluster2.leader.name = 'postgresql2'
|
|
dcs.get_cluster.side_effect = [cluster, cluster2]
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
dcs.get_cluster.side_effect = None
|
|
dcs.manual_failover.return_value = False
|
|
MockRestApiServer(RestApiHandler, request)
|
|
dcs.manual_failover.return_value = True
|
|
|
|
with patch.object(MockHa, 'fetch_nodes_statuses', Mock(return_value=[])):
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
# Valid future date
|
|
request = post + '103\n\n{"leader": "postgresql1", "member": "postgresql2",' +\
|
|
' "scheduled_at": "6016-02-15T18:13:30.568224+01:00"}'
|
|
MockRestApiServer(RestApiHandler, request)
|
|
with patch.object(MockPatroni, 'dcs') as d:
|
|
d.manual_failover.return_value = False
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
# Exception: No timezone specified
|
|
request = post + '97\n\n{"leader": "postgresql1", "member": "postgresql2",' +\
|
|
' "scheduled_at": "6016-02-15T18:13:30.568224"}'
|
|
MockRestApiServer(RestApiHandler, request)
|
|
|
|
# Exception: Scheduled in the past
|
|
request = post + '103\n\n{"leader": "postgresql1", "member": "postgresql2", "scheduled_at": "'
|
|
MockRestApiServer(RestApiHandler, request + '1016-02-15T18:13:30.568224+01:00"}')
|
|
|
|
# Invalid date
|
|
self.assertIsNotNone(MockRestApiServer(RestApiHandler, request + '2010-02-29T18:13:30.568224+01:00"}'))
|
|
|
|
@patch.object(MockPatroni, 'dcs', Mock())
|
|
def test_do_POST_failover(self):
|
|
post = 'POST /failover HTTP/1.0' + self._authorization + '\nContent-Length: '
|
|
MockRestApiServer(RestApiHandler, post + '14\n\n{"leader":"1"}')
|
|
MockRestApiServer(RestApiHandler, post + '37\n\n{"candidate":"2","scheduled_at": "1"}')
|
|
|
|
|
|
@patch('ssl.wrap_socket', Mock(return_value=0))
|
|
class TestRestApiServer(unittest.TestCase):
|
|
|
|
def test_reload_config(self):
|
|
bad_config = {'listen': 'foo'}
|
|
self.assertRaises(ValueError, MockRestApiServer, None, '', bad_config)
|
|
srv = MockRestApiServer(lambda a1, a2, a3: None, '')
|
|
self.assertRaises(ValueError, srv.reload_config, bad_config)
|
|
self.assertRaises(ValueError, srv.reload_config, {})
|
|
srv.reload_config({'listen': '127.0.0.2:8008'})
|