mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 18:20:05 +00:00
* use `SHOW primary_conninfo` instead of parsing config file on pg12
* strip out standby and recovery parameters from postgresql.auto.conf before starting the postgres 12
Patroni config remains backward compatible.
Despite for example `restore_command` converted to a GUC starting from postgresql 12, in the Patroni configuration you can still keep it in the `postgresql.recovery_conf` section.
If you put it into `postgresql.parameters.restore_command`, that will also work, but it is important not to mix both ways:
```yaml
# is OK
postgresql:
parameters:
restore_command: my_restore_command
archive_cleanup_command: my_archive_cleanup_command
# is OK
postgresql:
recovery_conf:
restore_command: my_restore_command
archive_cleanup_command: my_archive_cleanup_command
# is NOT ok
postgresql:
parameters:
restore_command: my_restore_command
recovery_conf:
archive_cleanup_command: my_archive_cleanup_command
```
217 lines
8.3 KiB
Python
217 lines
8.3 KiB
Python
import datetime
|
|
import json
|
|
import os
|
|
import shutil
|
|
import unittest
|
|
|
|
from mock import Mock, patch
|
|
from tempfile import gettempdir
|
|
|
|
import psycopg2
|
|
import requests
|
|
|
|
from patroni.dcs import Leader, Member
|
|
from patroni.postgresql import Postgresql
|
|
from patroni.postgresql.config import ConfigHandler
|
|
from patroni.utils import RetryFailedError
|
|
|
|
|
|
class SleepException(Exception):
|
|
pass
|
|
|
|
|
|
class MockResponse(object):
|
|
|
|
def __init__(self, status_code=200):
|
|
self.status_code = status_code
|
|
self.content = '{}'
|
|
self.ok = True
|
|
|
|
def json(self):
|
|
return json.loads(self.content)
|
|
|
|
@property
|
|
def data(self):
|
|
return self.content.encode('utf-8')
|
|
|
|
@property
|
|
def text(self):
|
|
return self.content
|
|
|
|
@property
|
|
def status(self):
|
|
return self.status_code
|
|
|
|
@staticmethod
|
|
def getheader(*args):
|
|
return ''
|
|
|
|
|
|
def requests_get(url, **kwargs):
|
|
members = '[{"id":14855829450254237642,"peerURLs":["http://localhost:2380","http://localhost:7001"],' +\
|
|
'"name":"default","clientURLs":["http://localhost:2379","http://localhost:4001"]}]'
|
|
response = MockResponse()
|
|
if url.startswith('http://local'):
|
|
raise requests.exceptions.RequestException()
|
|
elif ':8011/patroni' in url:
|
|
response.content = '{"role": "replica", "xlog": {"received_location": 0}, "tags": {}}'
|
|
elif url.endswith('/members'):
|
|
response.content = '[{}]' if url.startswith('http://error') else members
|
|
elif url.startswith('http://exhibitor'):
|
|
response.content = '{"servers":["127.0.0.1","127.0.0.2","127.0.0.3"],"port":2181}'
|
|
elif url.endswith(':8011/reinitialize'):
|
|
data = kwargs.get('data', '')
|
|
if ' false}' in data:
|
|
response.status_code = 503
|
|
response.ok = False
|
|
response.content = 'restarting after failure already in progress'
|
|
else:
|
|
response.status_code = 404
|
|
response.ok = False
|
|
return response
|
|
|
|
|
|
class MockPostmaster(object):
|
|
def __init__(self, is_running=True, is_single_master=False):
|
|
self.is_running = Mock(return_value=is_running)
|
|
self.is_single_master = Mock(return_value=is_single_master)
|
|
self.wait_for_user_backends_to_close = Mock()
|
|
self.signal_stop = Mock(return_value=None)
|
|
self.wait = Mock()
|
|
|
|
|
|
class MockCursor(object):
|
|
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
self.closed = False
|
|
self.rowcount = 0
|
|
self.results = []
|
|
|
|
def execute(self, sql, *params):
|
|
if sql.startswith('blabla'):
|
|
raise psycopg2.ProgrammingError()
|
|
elif sql == 'CHECKPOINT' or sql.startswith('SELECT pg_catalog.pg_create_'):
|
|
raise psycopg2.OperationalError()
|
|
elif sql.startswith('RetryFailedError'):
|
|
raise RetryFailedError('retry')
|
|
elif sql.startswith('SELECT slot_name'):
|
|
self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'a', 'b')]
|
|
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):
|
|
self.results = [(1, 2)]
|
|
elif sql.startswith('SELECT pg_catalog.pg_is_in_recovery()'):
|
|
self.results = [(False, 2)]
|
|
elif sql.startswith('WITH replication_info AS ('):
|
|
replication_info = '[{"application_name":"walreceiver","client_addr":"1.2.3.4",' +\
|
|
'"state":"streaming","sync_state":"async","sync_priority":0}]'
|
|
self.results = [('', 0, '', '', '', '', False, replication_info)]
|
|
elif sql.startswith('SELECT name, setting'):
|
|
self.results = [('wal_segment_size', '2048', '8kB', 'integer', 'internal'),
|
|
('search_path', 'public', None, 'string', 'user'),
|
|
('port', '5433', None, 'integer', 'postmaster'),
|
|
('listen_addresses', '*', None, 'string', 'postmaster'),
|
|
('autovacuum', 'on', None, 'bool', 'sighup'),
|
|
('unix_socket_directories', '/tmp', None, 'string', 'postmaster')]
|
|
elif sql.startswith('IDENTIFY_SYSTEM'):
|
|
self.results = [('1', 2, '0/402EEC0', '')]
|
|
elif sql.startswith('SELECT isdir, modification'):
|
|
self.results = [(False, datetime.datetime.now())]
|
|
elif sql.startswith('SELECT pg_catalog.pg_read_file'):
|
|
self.results = [('1\t0/40159C0\tno recovery target specified\n\n'
|
|
'2\t1/40159C0\tno recovery target specified\n',)]
|
|
elif sql.startswith('TIMELINE_HISTORY '):
|
|
self.results = [('', b'x\t0/40159C0\tno recovery target specified\n\n'
|
|
b'1\t0/40159C0\tno recovery target specified\n\n'
|
|
b'2\t0/402DD98\tno recovery target specified\n\n'
|
|
b'3\t0/403DD98\tno recovery target specified\n')]
|
|
else:
|
|
self.results = [(None, None, None, None, None, None, None, None, None, None)]
|
|
|
|
def fetchone(self):
|
|
return self.results[0]
|
|
|
|
def fetchall(self):
|
|
return self.results
|
|
|
|
def __iter__(self):
|
|
for i in self.results:
|
|
yield i
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
pass
|
|
|
|
|
|
class MockConnect(object):
|
|
|
|
server_version = 99999
|
|
autocommit = False
|
|
closed = 0
|
|
|
|
def cursor(self):
|
|
return MockCursor(self)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
pass
|
|
|
|
@staticmethod
|
|
def close():
|
|
pass
|
|
|
|
|
|
def psycopg2_connect(*args, **kwargs):
|
|
return MockConnect()
|
|
|
|
|
|
class PostgresInit(unittest.TestCase):
|
|
_PARAMETERS = {'wal_level': 'hot_standby', 'max_replication_slots': 5, 'f.oo': 'bar',
|
|
'search_path': 'public', 'hot_standby': 'on', 'max_wal_senders': 5,
|
|
'wal_keep_segments': 8, 'wal_log_hints': 'on', 'max_locks_per_transaction': 64,
|
|
'max_worker_processes': 8, 'max_connections': 100, 'max_prepared_transactions': 0,
|
|
'track_commit_timestamp': 'off', 'unix_socket_directories': '/tmp', 'trigger_file': 'bla'}
|
|
|
|
@patch('psycopg2.connect', psycopg2_connect)
|
|
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
|
|
@patch.object(ConfigHandler, 'replace_pg_hba', Mock())
|
|
@patch.object(ConfigHandler, 'replace_pg_ident', Mock())
|
|
def setUp(self):
|
|
data_dir = 'data/test0'
|
|
self.p = Postgresql({'name': 'postgresql0', 'scope': 'batman', 'data_dir': data_dir,
|
|
'config_dir': data_dir, 'retry_timeout': 10,
|
|
'krbsrvname': 'postgres', 'pgpass': os.path.join(gettempdir(), 'pgpass0'),
|
|
'listen': '127.0.0.2, 127.0.0.3:5432', 'connect_address': '127.0.0.2:5432',
|
|
'authentication': {'superuser': {'username': 'foo', 'password': 'test'},
|
|
'replication': {'username': '', 'password': 'rep-pass'}},
|
|
'remove_data_directory_on_rewind_failure': True,
|
|
'use_pg_rewind': True, 'pg_ctl_timeout': 'bla',
|
|
'parameters': self._PARAMETERS,
|
|
'recovery_conf': {'foo': 'bar'},
|
|
'pg_hba': ['host all all 0.0.0.0/0 md5'],
|
|
'pg_ident': ['krb realm postgres'],
|
|
'callbacks': {'on_start': 'true', 'on_stop': 'true', 'on_reload': 'true',
|
|
'on_restart': 'true', 'on_role_change': 'true'}})
|
|
|
|
|
|
class BaseTestPostgresql(PostgresInit):
|
|
|
|
def setUp(self):
|
|
super(BaseTestPostgresql, self).setUp()
|
|
|
|
if not os.path.exists(self.p.data_dir):
|
|
os.makedirs(self.p.data_dir)
|
|
|
|
self.leadermem = Member(0, 'leader', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres'})
|
|
self.leader = Leader(-1, 28, self.leadermem)
|
|
self.other = Member(0, 'test-1', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5433/postgres',
|
|
'tags': {'replicatefrom': 'leader'}})
|
|
self.me = Member(0, 'test0', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5434/postgres'})
|
|
|
|
def tearDown(self):
|
|
if os.path.exists(self.p.data_dir):
|
|
shutil.rmtree(self.p.data_dir)
|