mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
The standby cluster doesn't know about leader elections in the main cluster and therefore the usual mechanisms of detecting divergences don't work. For example, it could happen that the standby cluster is ahead of the new primary of the main cluster and must be rewound. There is a way to know that the new timeline has been created by checking the presence of a history file in pg_wal. If the new file is there, we will start usual procedures of making sure that we can continue streaming or will run the pg_rewind.
206 lines
8.2 KiB
Python
206 lines
8.2 KiB
Python
import os
|
|
import shutil
|
|
import unittest
|
|
|
|
from mock import Mock, patch
|
|
|
|
import psycopg2
|
|
import urllib3
|
|
|
|
from patroni.dcs import Leader, Member
|
|
from patroni.postgresql import Postgresql
|
|
from patroni.postgresql.config import ConfigHandler
|
|
from patroni.utils import RetryFailedError
|
|
|
|
|
|
class SleepException(Exception):
|
|
pass
|
|
|
|
|
|
class MockResponse(object):
|
|
|
|
def __init__(self, status_code=200):
|
|
self.status_code = status_code
|
|
self.content = '{}'
|
|
|
|
@property
|
|
def data(self):
|
|
return self.content.encode('utf-8')
|
|
|
|
@property
|
|
def status(self):
|
|
return self.status_code
|
|
|
|
@staticmethod
|
|
def getheader(*args):
|
|
return ''
|
|
|
|
|
|
def requests_get(url, **kwargs):
|
|
members = '[{"id":14855829450254237642,"peerURLs":["http://localhost:2380","http://localhost:7001"],' +\
|
|
'"name":"default","clientURLs":["http://localhost:2379","http://localhost:4001"]}]'
|
|
response = MockResponse()
|
|
if url.startswith('http://local'):
|
|
raise urllib3.exceptions.HTTPError()
|
|
elif ':8011/patroni' in url:
|
|
response.content = '{"role": "replica", "xlog": {"received_location": 0}, "tags": {}}'
|
|
elif url.endswith('/members'):
|
|
response.content = '[{}]' if url.startswith('http://error') else members
|
|
elif url.startswith('http://exhibitor'):
|
|
response.content = '{"servers":["127.0.0.1","127.0.0.2","127.0.0.3"],"port":2181}'
|
|
elif url.endswith(':8011/reinitialize'):
|
|
data = kwargs.get('data', '')
|
|
if ' false}' in data:
|
|
response.status_code = 503
|
|
response.content = 'restarting after failure already in progress'
|
|
else:
|
|
response.status_code = 404
|
|
return response
|
|
|
|
|
|
class MockPostmaster(object):
|
|
def __init__(self, is_running=True, is_single_master=False):
|
|
self.is_running = Mock(return_value=is_running)
|
|
self.is_single_master = Mock(return_value=is_single_master)
|
|
self.wait_for_user_backends_to_close = Mock()
|
|
self.signal_stop = Mock(return_value=None)
|
|
self.wait = Mock()
|
|
self.signal_kill = Mock(return_value=False)
|
|
|
|
|
|
class MockCursor(object):
|
|
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
self.closed = False
|
|
self.rowcount = 0
|
|
self.results = []
|
|
self.description = [Mock()]
|
|
|
|
def execute(self, sql, *params):
|
|
if sql.startswith('blabla'):
|
|
raise psycopg2.ProgrammingError()
|
|
elif sql == 'CHECKPOINT' or sql.startswith('SELECT pg_catalog.pg_create_'):
|
|
raise psycopg2.OperationalError()
|
|
elif sql.startswith('RetryFailedError'):
|
|
raise RetryFailedError('retry')
|
|
elif sql.startswith('SELECT slot_name'):
|
|
self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'a', 'b')]
|
|
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):
|
|
self.results = [(1, 2, 1, 0, False, 1, 1, None, None)]
|
|
elif sql.startswith('SELECT pg_catalog.pg_is_in_recovery()'):
|
|
self.results = [(False, 2)]
|
|
elif sql.startswith('SELECT pg_catalog.to_char'):
|
|
replication_info = '[{"application_name":"walreceiver","client_addr":"1.2.3.4",' +\
|
|
'"state":"streaming","sync_state":"async","sync_priority":0}]'
|
|
self.results = [('', 0, '', '', '', '', False, replication_info)]
|
|
elif sql.startswith('SELECT name, setting'):
|
|
self.results = [('wal_segment_size', '2048', '8kB', 'integer', 'internal'),
|
|
('wal_block_size', '8192', None, 'integer', 'internal'),
|
|
('shared_buffers', '16384', '8kB', 'integer', 'postmaster'),
|
|
('wal_buffers', '-1', '8kB', 'integer', 'postmaster'),
|
|
('search_path', 'public', None, 'string', 'user'),
|
|
('port', '5433', None, 'integer', 'postmaster'),
|
|
('listen_addresses', '*', None, 'string', 'postmaster'),
|
|
('autovacuum', 'on', None, 'bool', 'sighup'),
|
|
('unix_socket_directories', '/tmp', None, 'string', 'postmaster')]
|
|
elif sql.startswith('IDENTIFY_SYSTEM'):
|
|
self.results = [('1', 3, '0/402EEC0', '')]
|
|
elif sql.startswith('TIMELINE_HISTORY '):
|
|
self.results = [('', b'x\t0/40159C0\tno recovery target specified\n\n'
|
|
b'1\t0/40159C0\tno recovery target specified\n\n'
|
|
b'2\t0/402DD98\tno recovery target specified\n\n'
|
|
b'3\t0/403DD98\tno recovery target specified\n')]
|
|
else:
|
|
self.results = [(None, None, None, None, None, None, None, None, None, None)]
|
|
|
|
def fetchone(self):
|
|
return self.results[0]
|
|
|
|
def fetchall(self):
|
|
return self.results
|
|
|
|
def __iter__(self):
|
|
for i in self.results:
|
|
yield i
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
pass
|
|
|
|
|
|
class MockConnect(object):
|
|
|
|
server_version = 99999
|
|
autocommit = False
|
|
closed = 0
|
|
|
|
def cursor(self):
|
|
return MockCursor(self)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
pass
|
|
|
|
@staticmethod
|
|
def close():
|
|
pass
|
|
|
|
|
|
def psycopg2_connect(*args, **kwargs):
|
|
return MockConnect()
|
|
|
|
|
|
class PostgresInit(unittest.TestCase):
|
|
_PARAMETERS = {'wal_level': 'hot_standby', 'max_replication_slots': 5, 'f.oo': 'bar',
|
|
'search_path': 'public', 'hot_standby': 'on', 'max_wal_senders': 5,
|
|
'wal_keep_segments': 8, 'wal_log_hints': 'on', 'max_locks_per_transaction': 64,
|
|
'max_worker_processes': 8, 'max_connections': 100, 'max_prepared_transactions': 0,
|
|
'track_commit_timestamp': 'off', 'unix_socket_directories': '/tmp', 'trigger_file': 'bla',
|
|
'stats_temp_directory': '/tmp'}
|
|
|
|
@patch('psycopg2.connect', psycopg2_connect)
|
|
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
|
|
@patch.object(ConfigHandler, 'replace_pg_hba', Mock())
|
|
@patch.object(ConfigHandler, 'replace_pg_ident', Mock())
|
|
@patch.object(Postgresql, 'get_postgres_role_from_data_directory', Mock(return_value='master'))
|
|
def setUp(self):
|
|
data_dir = os.path.join('data', 'test0')
|
|
self.p = Postgresql({'name': 'postgresql0', 'scope': 'batman', 'data_dir': data_dir,
|
|
'config_dir': data_dir, 'retry_timeout': 10,
|
|
'krbsrvname': 'postgres', 'pgpass': os.path.join(data_dir, 'pgpass0'),
|
|
'listen': '127.0.0.2, 127.0.0.3:5432', 'connect_address': '127.0.0.2:5432',
|
|
'authentication': {'superuser': {'username': 'foo', 'password': 'test'},
|
|
'replication': {'username': '', 'password': 'rep-pass'}},
|
|
'remove_data_directory_on_rewind_failure': True,
|
|
'use_pg_rewind': True, 'pg_ctl_timeout': 'bla',
|
|
'parameters': self._PARAMETERS,
|
|
'recovery_conf': {'foo': 'bar'},
|
|
'pg_hba': ['host all all 0.0.0.0/0 md5'],
|
|
'pg_ident': ['krb realm postgres'],
|
|
'callbacks': {'on_start': 'true', 'on_stop': 'true', 'on_reload': 'true',
|
|
'on_restart': 'true', 'on_role_change': 'true'}})
|
|
|
|
|
|
class BaseTestPostgresql(PostgresInit):
|
|
|
|
def setUp(self):
|
|
super(BaseTestPostgresql, self).setUp()
|
|
|
|
if not os.path.exists(self.p.data_dir):
|
|
os.makedirs(self.p.data_dir)
|
|
|
|
self.leadermem = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres'})
|
|
self.leader = Leader(-1, 28, self.leadermem)
|
|
self.other = Member(0, 'test-1', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5433/postgres',
|
|
'tags': {'replicatefrom': 'leader'}})
|
|
self.me = Member(0, 'test0', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5434/postgres'})
|
|
|
|
def tearDown(self):
|
|
if os.path.exists(self.p.data_dir):
|
|
shutil.rmtree(self.p.data_dir)
|