mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
- update release notes - bump Patroni version - bump pyright version and fix reported issues - improve compatibility with legacy psycopg2 Co-authored-by: Polina Bungina <bungina@gmail.com>
294 lines
13 KiB
Python
294 lines
13 KiB
Python
import datetime
|
|
import os
|
|
import shutil
|
|
import unittest
|
|
|
|
from mock import Mock, PropertyMock, patch
|
|
|
|
import urllib3
|
|
|
|
import patroni.psycopg as psycopg
|
|
|
|
from patroni.dcs import Leader, Member
|
|
from patroni.postgresql import Postgresql
|
|
from patroni.postgresql.config import ConfigHandler
|
|
from patroni.utils import RetryFailedError, tzutc
|
|
|
|
|
|
class SleepException(Exception):
|
|
pass
|
|
|
|
|
|
mock_available_gucs = PropertyMock(return_value={
|
|
'cluster_name', 'constraint_exclusion', 'force_parallel_mode', 'hot_standby', 'listen_addresses', 'max_connections',
|
|
'max_locks_per_transaction', 'max_prepared_transactions', 'max_replication_slots', 'max_stack_depth',
|
|
'max_wal_senders', 'max_worker_processes', 'port', 'search_path', 'shared_preload_libraries',
|
|
'stats_temp_directory', 'synchronous_standby_names', 'track_commit_timestamp', 'unix_socket_directories',
|
|
'vacuum_cost_delay', 'vacuum_cost_limit', 'wal_keep_size', 'wal_level', 'wal_log_hints', 'zero_damaged_pages',
|
|
'autovacuum', 'wal_segment_size', 'wal_block_size', 'shared_buffers', 'wal_buffers',
|
|
})
|
|
|
|
GET_PG_SETTINGS_RESULT = [
|
|
('wal_segment_size', '2048', '8kB', 'integer', 'internal'),
|
|
('wal_block_size', '8192', None, 'integer', 'internal'),
|
|
('shared_buffers', '16384', '8kB', 'integer', 'postmaster'),
|
|
('wal_buffers', '-1', '8kB', 'integer', 'postmaster'),
|
|
('max_connections', '100', None, 'integer', 'postmaster'),
|
|
('max_prepared_transactions', '200', None, 'integer', 'postmaster'),
|
|
('max_worker_processes', '8', None, 'integer', 'postmaster'),
|
|
('max_locks_per_transaction', '64', None, 'integer', 'postmaster'),
|
|
('max_wal_senders', '5', None, 'integer', 'postmaster'),
|
|
('search_path', 'public', None, 'string', 'user'),
|
|
('port', '5432', None, 'integer', 'postmaster'),
|
|
('listen_addresses', '127.0.0.2, 127.0.0.3', None, 'string', 'postmaster'),
|
|
('autovacuum', 'on', None, 'bool', 'sighup'),
|
|
('unix_socket_directories', '/tmp', None, 'string', 'postmaster'),
|
|
('shared_preload_libraries', 'citus', None, 'string', 'postmaster'),
|
|
('wal_keep_size', '128', 'MB', 'integer', 'sighup'),
|
|
('cluster_name', 'batman', None, 'string', 'postmaster'),
|
|
('vacuum_cost_delay', '200', 'ms', 'real', 'user'),
|
|
('vacuum_cost_limit', '-1', None, 'integer', 'user'),
|
|
('max_stack_depth', '2048', 'kB', 'integer', 'superuser'),
|
|
('constraint_exclusion', '', None, 'enum', 'user'),
|
|
('force_parallel_mode', '1', None, 'enum', 'user'),
|
|
('zero_damaged_pages', 'off', None, 'bool', 'superuser'),
|
|
('stats_temp_directory', '/tmp', None, 'string', 'sighup'),
|
|
('track_commit_timestamp', 'off', None, 'bool', 'postmaster'),
|
|
('wal_log_hints', 'on', None, 'bool', 'postmaster'),
|
|
('hot_standby', 'on', None, 'bool', 'postmaster'),
|
|
('max_replication_slots', '5', None, 'integer', 'postmaster'),
|
|
('wal_level', 'logical', None, 'enum', 'postmaster'),
|
|
]
|
|
|
|
|
|
class MockResponse(object):
|
|
|
|
def __init__(self, status_code=200):
|
|
self.status_code = status_code
|
|
self.headers = {'content-type': 'json'}
|
|
self.content = '{}'
|
|
self.reason = 'Not Found'
|
|
|
|
@property
|
|
def data(self):
|
|
return self.content.encode('utf-8')
|
|
|
|
@property
|
|
def status(self):
|
|
return self.status_code
|
|
|
|
@staticmethod
|
|
def getheader(*args):
|
|
return ''
|
|
|
|
|
|
def requests_get(url, method='GET', endpoint=None, data='', **kwargs):
|
|
members = '[{"id":14855829450254237642,"peerURLs":["http://localhost:2380","http://localhost:7001"],' +\
|
|
'"name":"default","clientURLs":["http://localhost:2379","http://localhost:4001"]}]'
|
|
response = MockResponse()
|
|
if endpoint == 'failsafe':
|
|
response.content = 'Accepted'
|
|
elif url.startswith('http://local'):
|
|
raise urllib3.exceptions.HTTPError()
|
|
elif ':8011/patroni' in url:
|
|
response.content = '{"role": "replica", "wal": {"received_location": 0}, "tags": {}}'
|
|
elif url.endswith('/members'):
|
|
response.content = '[{}]' if url.startswith('http://error') else members
|
|
elif url.startswith('http://exhibitor'):
|
|
response.content = '{"servers":["127.0.0.1","127.0.0.2","127.0.0.3"],"port":2181}'
|
|
elif url.endswith(':8011/reinitialize'):
|
|
if ' false}' in data:
|
|
response.status_code = 503
|
|
response.content = 'restarting after failure already in progress'
|
|
else:
|
|
response.status_code = 404
|
|
return response
|
|
|
|
|
|
class MockPostmaster(object):
|
|
def __init__(self, pid=1):
|
|
self.is_running = Mock(return_value=self)
|
|
self.wait_for_user_backends_to_close = Mock()
|
|
self.signal_stop = Mock(return_value=None)
|
|
self.wait = Mock()
|
|
self.signal_kill = Mock(return_value=False)
|
|
|
|
|
|
class MockCursor(object):
|
|
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
self.closed = False
|
|
self.rowcount = 0
|
|
self.results = []
|
|
self.description = [Mock()]
|
|
|
|
def execute(self, sql, *params):
|
|
if isinstance(sql, bytes):
|
|
sql = sql.decode('utf-8')
|
|
if sql.startswith('blabla'):
|
|
raise psycopg.ProgrammingError()
|
|
elif sql == 'CHECKPOINT' or sql.startswith('SELECT pg_catalog.pg_create_'):
|
|
raise psycopg.OperationalError()
|
|
elif sql.startswith('RetryFailedError'):
|
|
raise RetryFailedError('retry')
|
|
elif sql.startswith('SELECT slot_name, catalog_xmin'):
|
|
self.results = [('postgresql0', 100), ('ls', 100)]
|
|
elif sql.startswith('SELECT slot_name, slot_type, datname, plugin, catalog_xmin'):
|
|
self.results = [('ls', 'logical', 'a', 'b', 100, 500, b'123456')]
|
|
elif sql.startswith('SELECT slot_name'):
|
|
self.results = [('blabla', 'physical', 12345),
|
|
('foobar', 'physical', 12345),
|
|
('ls', 'logical', 499, 'b', 'a', 5, 100, 500)]
|
|
elif sql.startswith('WITH slots AS (SELECT slot_name, active'):
|
|
self.results = [(False, True)] if self.rowcount == 1 else []
|
|
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):
|
|
self.results = [(1, 2, 1, 0, False, 1, 1, None, None, 'streaming', '',
|
|
[{"slot_name": "ls", "confirmed_flush_lsn": 12345, "restart_lsn": 12344}],
|
|
'on', 'n1', None)]
|
|
elif sql.startswith('SELECT pg_catalog.pg_is_in_recovery()'):
|
|
self.results = [(False, 2)]
|
|
elif sql.startswith('SELECT pg_catalog.pg_postmaster_start_time'):
|
|
self.results = [(datetime.datetime.now(tzutc),)]
|
|
elif sql.startswith('SELECT name, pg_catalog.current_setting(name) FROM pg_catalog.pg_settings'):
|
|
self.results = [('data_directory', 'data'),
|
|
('hba_file', os.path.join('data', 'pg_hba.conf')),
|
|
('ident_file', os.path.join('data', 'pg_ident.conf')),
|
|
('max_connections', 42),
|
|
('max_locks_per_transaction', 73),
|
|
('max_prepared_transactions', 0),
|
|
('max_replication_slots', 21),
|
|
('max_wal_senders', 37),
|
|
('track_commit_timestamp', 'off'),
|
|
('wal_level', 'replica'),
|
|
('listen_addresses', '6.6.6.6'),
|
|
('port', 1984),
|
|
('archive_command', 'my archive command'),
|
|
('cluster_name', 'my_cluster')]
|
|
elif sql.startswith('SELECT name, setting'):
|
|
self.results = GET_PG_SETTINGS_RESULT
|
|
elif sql.startswith('SELECT COUNT(*) FROM pg_catalog.pg_settings'):
|
|
self.results = [(0,)]
|
|
elif sql.startswith('IDENTIFY_SYSTEM'):
|
|
self.results = [('1', 3, '0/402EEC0', '')]
|
|
elif sql.startswith('TIMELINE_HISTORY '):
|
|
self.results = [('', b'x\t0/40159C0\tno recovery target specified\n\n'
|
|
b'1\t0/40159C0\tno recovery target specified\n\n'
|
|
b'2\t0/402DD98\tno recovery target specified\n\n'
|
|
b'3\t0/403DD98\tno recovery target specified\n')]
|
|
elif sql.startswith('SELECT pg_catalog.citus_add_node'):
|
|
self.results = [(2,)]
|
|
elif sql.startswith('SELECT nodeid, groupid'):
|
|
self.results = [(1, 0, 'host1', 5432, 'primary'), (2, 1, 'host2', 5432, 'primary')]
|
|
else:
|
|
self.results = [(None, None, None, None, None, None, None, None, None, None)]
|
|
self.rowcount = len(self.results)
|
|
|
|
def fetchone(self):
|
|
return self.results[0]
|
|
|
|
def fetchall(self):
|
|
return self.results
|
|
|
|
def __iter__(self):
|
|
for i in self.results:
|
|
yield i
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
pass
|
|
|
|
|
|
class MockConnectionInfo(object):
|
|
|
|
def parameter_status(self, param_name):
|
|
if param_name == 'is_superuser':
|
|
return 'on'
|
|
return '0'
|
|
|
|
|
|
class MockConnect(object):
|
|
|
|
server_version = 99999
|
|
autocommit = False
|
|
closed = 0
|
|
info = MockConnectionInfo()
|
|
|
|
def cursor(self):
|
|
return MockCursor(self)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
pass
|
|
|
|
@staticmethod
|
|
def close():
|
|
pass
|
|
|
|
|
|
def psycopg_connect(*args, **kwargs):
|
|
return MockConnect()
|
|
|
|
|
|
class PostgresInit(unittest.TestCase):
|
|
_PARAMETERS = {'wal_level': 'hot_standby', 'max_replication_slots': 5, 'f.oo': 'bar',
|
|
'search_path': 'public', 'hot_standby': 'on', 'max_wal_senders': 5,
|
|
'wal_keep_segments': 8, 'wal_log_hints': 'on', 'max_locks_per_transaction': 64,
|
|
'max_worker_processes': 8, 'max_connections': 100, 'max_prepared_transactions': 200,
|
|
'track_commit_timestamp': 'off', 'unix_socket_directories': '/tmp',
|
|
'trigger_file': 'bla', 'stats_temp_directory': '/tmp', 'zero_damaged_pages': 'off',
|
|
'force_parallel_mode': '1', 'constraint_exclusion': '',
|
|
'max_stack_depth': 2048, 'vacuum_cost_limit': -1, 'vacuum_cost_delay': 200}
|
|
|
|
@patch('patroni.psycopg._connect', psycopg_connect)
|
|
@patch('patroni.postgresql.CallbackExecutor', Mock())
|
|
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
|
|
@patch.object(ConfigHandler, 'replace_pg_hba', Mock())
|
|
@patch.object(ConfigHandler, 'replace_pg_ident', Mock())
|
|
@patch.object(Postgresql, 'get_postgres_role_from_data_directory', Mock(return_value='primary'))
|
|
def setUp(self):
|
|
data_dir = os.path.join('data', 'test0')
|
|
self.p = Postgresql({'name': 'postgresql0', 'scope': 'batman', 'data_dir': data_dir,
|
|
'config_dir': data_dir, 'retry_timeout': 10,
|
|
'krbsrvname': 'postgres', 'pgpass': os.path.join(data_dir, 'pgpass0'),
|
|
'listen': '127.0.0.2, 127.0.0.3:5432',
|
|
'connect_address': '127.0.0.2:5432', 'proxy_address': '127.0.0.2:5433',
|
|
'authentication': {'superuser': {'username': 'foo', 'password': 'test'},
|
|
'replication': {'username': '', 'password': 'rep-pass'},
|
|
'rewind': {'username': 'rewind', 'password': 'test'}},
|
|
'remove_data_directory_on_rewind_failure': True,
|
|
'use_pg_rewind': True, 'pg_ctl_timeout': 'bla', 'use_unix_socket': True,
|
|
'parameters': self._PARAMETERS,
|
|
'recovery_conf': {'foo': 'bar'},
|
|
'pg_hba': ['host all all 0.0.0.0/0 md5'],
|
|
'pg_ident': ['krb realm postgres'],
|
|
'callbacks': {'on_start': 'true', 'on_stop': 'true', 'on_reload': 'true',
|
|
'on_restart': 'true', 'on_role_change': 'true'},
|
|
'citus': {'group': 0, 'database': 'citus'}})
|
|
|
|
|
|
class BaseTestPostgresql(PostgresInit):
|
|
|
|
@patch('time.sleep', Mock())
|
|
def setUp(self):
|
|
super(BaseTestPostgresql, self).setUp()
|
|
|
|
if not os.path.exists(self.p.data_dir):
|
|
os.makedirs(self.p.data_dir)
|
|
|
|
self.leadermem = Member(0, 'leader', 28, {'xlog_location': 100, 'state': 'running',
|
|
'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres'})
|
|
self.leader = Leader(-1, 28, self.leadermem)
|
|
self.other = Member(0, 'test-1', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5433/postgres',
|
|
'state': 'running', 'tags': {'replicatefrom': 'leader'}})
|
|
self.me = Member(0, 'test0', 28, {
|
|
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5434/postgres'})
|
|
|
|
def tearDown(self):
|
|
if os.path.exists(self.p.data_dir):
|
|
shutil.rmtree(self.p.data_dir)
|